This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new abffff9dfb Flink: Add REST catalog shorthand (#7053)
abffff9dfb is described below
commit abffff9dfb0ae6793f1a5ccdff6d69a101cbe27f
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Mar 10 22:50:10 2023 +0100
Flink: Add REST catalog shorthand (#7053)
---
.../org/apache/iceberg/flink/CatalogLoader.java | 32 ++++++++++++++++++++++
.../apache/iceberg/flink/FlinkCatalogFactory.java | 8 ++++--
...alogTableLoader.java => TestCatalogLoader.java} | 32 ++++++----------------
.../iceberg/flink/TestCatalogTableLoader.java | 31 ++++-----------------
4 files changed, 52 insertions(+), 51 deletions(-)
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index 7c098cf20d..48fe7b8493 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTCatalog;
/** Serializable loader to load an Iceberg {@link Catalog}. */
public interface CatalogLoader extends Serializable {
@@ -53,6 +54,10 @@ public interface CatalogLoader extends Serializable {
return new HiveCatalogLoader(name, hadoopConf, properties);
}
+ static CatalogLoader rest(String name, Configuration hadoopConf, Map<String,
String> properties) {
+ return new RESTCatalogLoader(name, hadoopConf, properties);
+ }
+
static CatalogLoader custom(
String name, Map<String, String> properties, Configuration hadoopConf,
String impl) {
return new CustomCatalogLoader(name, properties, hadoopConf, impl);
@@ -125,6 +130,33 @@ public interface CatalogLoader extends Serializable {
}
}
+ class RESTCatalogLoader implements CatalogLoader {
+ private final String catalogName;
+ private final SerializableConfiguration hadoopConf;
+ private final Map<String, String> properties;
+
+ private RESTCatalogLoader(
+ String catalogName, Configuration conf, Map<String, String>
properties) {
+ this.catalogName = catalogName;
+ this.hadoopConf = new SerializableConfiguration(conf);
+ this.properties = Maps.newHashMap(properties);
+ }
+
+ @Override
+ public Catalog loadCatalog() {
+ return CatalogUtil.loadCatalog(
+ RESTCatalog.class.getName(), catalogName, properties,
hadoopConf.get());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("catalogName", catalogName)
+ .add("properties", properties)
+ .toString();
+ }
+ }
+
class CustomCatalogLoader implements CatalogLoader {
private final SerializableConfiguration hadoopConf;
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index ff1c0a0591..1453753849 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -45,7 +45,7 @@ import org.apache.iceberg.util.PropertyUtil;
*
* <ul>
* <li><code>type</code> - Flink catalog factory key, should be "iceberg"
- * <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"
+ * <li><code>catalog-type</code> - iceberg catalog type, "hive", "hadoop" or
"rest"
* <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)
* <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)
* <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)
@@ -64,6 +64,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+ public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
public static final String HIVE_CONF_DIR = "hive-conf-dir";
public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
@@ -111,9 +112,12 @@ public class FlinkCatalogFactory implements CatalogFactory
{
case ICEBERG_CATALOG_TYPE_HADOOP:
return CatalogLoader.hadoop(name, hadoopConf, properties);
+ case ICEBERG_CATALOG_TYPE_REST:
+ return CatalogLoader.rest(name, hadoopConf, properties);
+
default:
throw new UnsupportedOperationException(
- "Unknown catalog-type: " + catalogType + " (Must be 'hive' or
'hadoop')");
+ "Unknown catalog-type: " + catalogType + " (Must be 'hive',
'hadoop' or 'rest')");
}
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
similarity index 80%
copy from
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
copy to
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
index e77c62c384..384ac5c52d 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;
+import static org.apache.iceberg.CatalogProperties.URI;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -32,7 +34,6 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -42,8 +43,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-/** Test for {@link CatalogLoader} and {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+/** Test for {@link CatalogLoader}. */
+public class TestCatalogLoader extends FlinkTestBase {
private static File warehouse = null;
private static final TableIdentifier IDENTIFIER =
TableIdentifier.of("default", "my_table");
@@ -81,16 +82,10 @@ public class TestCatalogTableLoader extends FlinkTestBase {
}
@Test
- public void testHadoopTableLoader() throws IOException,
ClassNotFoundException {
- String location = "file:" + warehouse + "/my_table";
- new HadoopTables(hiveConf).create(SCHEMA, location);
- validateTableLoader(TableLoader.fromHadoopTable(location, hiveConf));
- }
-
- @Test
- public void testHiveCatalogTableLoader() throws IOException,
ClassNotFoundException {
- CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf,
Maps.newHashMap());
- validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
+ public void testRESTCatalogLoader() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(URI, "http://localhost/");
+ CatalogLoader.rest("my_catalog", hiveConf, Maps.newHashMap());
}
private static void validateCatalogLoader(CatalogLoader loader)
@@ -99,17 +94,6 @@ public class TestCatalogTableLoader extends FlinkTestBase {
validateHadoopConf(table);
}
- private static void validateTableLoader(TableLoader loader)
- throws IOException, ClassNotFoundException {
- TableLoader copied = javaSerAndDeSer(loader);
- copied.open();
- try {
- validateHadoopConf(copied.loadTable());
- } finally {
- copied.close();
- }
- }
-
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index e77c62c384..b3a2d45261 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -24,10 +24,8 @@ import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -42,7 +40,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-/** Test for {@link CatalogLoader} and {@link TableLoader}. */
+/** Test for {@link TableLoader}. */
public class TestCatalogTableLoader extends FlinkTestBase {
private static File warehouse = null;
@@ -66,20 +64,6 @@ public class TestCatalogTableLoader extends FlinkTestBase {
}
}
- @Test
- public void testHadoopCatalogLoader() throws IOException,
ClassNotFoundException {
- Map<String, String> properties = Maps.newHashMap();
- properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse);
- CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf,
properties);
- validateCatalogLoader(loader);
- }
-
- @Test
- public void testHiveCatalogLoader() throws IOException,
ClassNotFoundException {
- CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf,
Maps.newHashMap());
- validateCatalogLoader(loader);
- }
-
@Test
public void testHadoopTableLoader() throws IOException,
ClassNotFoundException {
String location = "file:" + warehouse + "/my_table";
@@ -89,19 +73,16 @@ public class TestCatalogTableLoader extends FlinkTestBase {
@Test
public void testHiveCatalogTableLoader() throws IOException,
ClassNotFoundException {
+ CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf,
Maps.newHashMap());
+ javaSerdes(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA);
+
CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf,
Maps.newHashMap());
validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
}
- private static void validateCatalogLoader(CatalogLoader loader)
- throws IOException, ClassNotFoundException {
- Table table =
javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA);
- validateHadoopConf(table);
- }
-
private static void validateTableLoader(TableLoader loader)
throws IOException, ClassNotFoundException {
- TableLoader copied = javaSerAndDeSer(loader);
+ TableLoader copied = javaSerdes(loader);
copied.open();
try {
validateHadoopConf(copied.loadTable());
@@ -120,7 +101,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
}
@SuppressWarnings("unchecked")
- private static <T> T javaSerAndDeSer(T object) throws IOException,
ClassNotFoundException {
+ private static <T> T javaSerdes(T object) throws IOException,
ClassNotFoundException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
out.writeObject(object);