This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new abffff9dfb Flink: Add REST catalog shorthand (#7053)
abffff9dfb is described below

commit abffff9dfb0ae6793f1a5ccdff6d69a101cbe27f
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Mar 10 22:50:10 2023 +0100

    Flink: Add REST catalog shorthand (#7053)
---
 .../org/apache/iceberg/flink/CatalogLoader.java    | 32 ++++++++++++++++++++++
 .../apache/iceberg/flink/FlinkCatalogFactory.java  |  8 ++++--
 ...alogTableLoader.java => TestCatalogLoader.java} | 32 ++++++----------------
 .../iceberg/flink/TestCatalogTableLoader.java      | 31 ++++-----------------
 4 files changed, 52 insertions(+), 51 deletions(-)

diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
index 7c098cf20d..48fe7b8493 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.RESTCatalog;
 
 /** Serializable loader to load an Iceberg {@link Catalog}. */
 public interface CatalogLoader extends Serializable {
@@ -53,6 +54,10 @@ public interface CatalogLoader extends Serializable {
     return new HiveCatalogLoader(name, hadoopConf, properties);
   }
 
+  static CatalogLoader rest(String name, Configuration hadoopConf, Map<String, 
String> properties) {
+    return new RESTCatalogLoader(name, hadoopConf, properties);
+  }
+
   static CatalogLoader custom(
       String name, Map<String, String> properties, Configuration hadoopConf, 
String impl) {
     return new CustomCatalogLoader(name, properties, hadoopConf, impl);
@@ -125,6 +130,33 @@ public interface CatalogLoader extends Serializable {
     }
   }
 
+  class RESTCatalogLoader implements CatalogLoader {
+    private final String catalogName;
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+
+    private RESTCatalogLoader(
+        String catalogName, Configuration conf, Map<String, String> 
properties) {
+      this.catalogName = catalogName;
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = Maps.newHashMap(properties);
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      return CatalogUtil.loadCatalog(
+          RESTCatalog.class.getName(), catalogName, properties, 
hadoopConf.get());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("catalogName", catalogName)
+          .add("properties", properties)
+          .toString();
+    }
+  }
+
   class CustomCatalogLoader implements CatalogLoader {
 
     private final SerializableConfiguration hadoopConf;
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index ff1c0a0591..1453753849 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -45,7 +45,7 @@ import org.apache.iceberg.util.PropertyUtil;
  *
  * <ul>
  *   <li><code>type</code> - Flink catalog factory key, should be "iceberg"
- *   <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"
+ *   <li><code>catalog-type</code> - iceberg catalog type, "hive", "hadoop" or 
"rest"
  *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)
  *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)
  *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)
@@ -64,6 +64,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
   public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
   public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+  public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
 
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
   public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
@@ -111,9 +112,12 @@ public class FlinkCatalogFactory implements CatalogFactory 
{
       case ICEBERG_CATALOG_TYPE_HADOOP:
         return CatalogLoader.hadoop(name, hadoopConf, properties);
 
+      case ICEBERG_CATALOG_TYPE_REST:
+        return CatalogLoader.rest(name, hadoopConf, properties);
+
       default:
         throw new UnsupportedOperationException(
-            "Unknown catalog-type: " + catalogType + " (Must be 'hive' or 
'hadoop')");
+            "Unknown catalog-type: " + catalogType + " (Must be 'hive', 
'hadoop' or 'rest')");
     }
   }
 
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
similarity index 80%
copy from 
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
copy to 
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
index e77c62c384..384ac5c52d 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogLoader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static org.apache.iceberg.CatalogProperties.URI;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -32,7 +34,6 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -42,8 +43,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-/** Test for {@link CatalogLoader} and {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+/** Test for {@link CatalogLoader}. */
+public class TestCatalogLoader extends FlinkTestBase {
 
   private static File warehouse = null;
   private static final TableIdentifier IDENTIFIER = 
TableIdentifier.of("default", "my_table");
@@ -81,16 +82,10 @@ public class TestCatalogTableLoader extends FlinkTestBase {
   }
 
   @Test
-  public void testHadoopTableLoader() throws IOException, 
ClassNotFoundException {
-    String location = "file:" + warehouse + "/my_table";
-    new HadoopTables(hiveConf).create(SCHEMA, location);
-    validateTableLoader(TableLoader.fromHadoopTable(location, hiveConf));
-  }
-
-  @Test
-  public void testHiveCatalogTableLoader() throws IOException, 
ClassNotFoundException {
-    CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, 
Maps.newHashMap());
-    validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
+  public void testRESTCatalogLoader() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(URI, "http://localhost/";);
+    CatalogLoader.rest("my_catalog", hiveConf, Maps.newHashMap());
   }
 
   private static void validateCatalogLoader(CatalogLoader loader)
@@ -99,17 +94,6 @@ public class TestCatalogTableLoader extends FlinkTestBase {
     validateHadoopConf(table);
   }
 
-  private static void validateTableLoader(TableLoader loader)
-      throws IOException, ClassNotFoundException {
-    TableLoader copied = javaSerAndDeSer(loader);
-    copied.open();
-    try {
-      validateHadoopConf(copied.loadTable());
-    } finally {
-      copied.close();
-    }
-  }
-
   private static void validateHadoopConf(Table table) {
     FileIO io = table.io();
     Assertions.assertThat(io)
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index e77c62c384..b3a2d45261 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -24,10 +24,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.Map;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -42,7 +40,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-/** Test for {@link CatalogLoader} and {@link TableLoader}. */
+/** Test for {@link TableLoader}. */
 public class TestCatalogTableLoader extends FlinkTestBase {
 
   private static File warehouse = null;
@@ -66,20 +64,6 @@ public class TestCatalogTableLoader extends FlinkTestBase {
     }
   }
 
-  @Test
-  public void testHadoopCatalogLoader() throws IOException, 
ClassNotFoundException {
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse);
-    CatalogLoader loader = CatalogLoader.hadoop("my_catalog", hiveConf, 
properties);
-    validateCatalogLoader(loader);
-  }
-
-  @Test
-  public void testHiveCatalogLoader() throws IOException, 
ClassNotFoundException {
-    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, 
Maps.newHashMap());
-    validateCatalogLoader(loader);
-  }
-
   @Test
   public void testHadoopTableLoader() throws IOException, 
ClassNotFoundException {
     String location = "file:" + warehouse + "/my_table";
@@ -89,19 +73,16 @@ public class TestCatalogTableLoader extends FlinkTestBase {
 
   @Test
   public void testHiveCatalogTableLoader() throws IOException, 
ClassNotFoundException {
+    CatalogLoader loader = CatalogLoader.hive("my_catalog", hiveConf, 
Maps.newHashMap());
+    javaSerdes(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA);
+
     CatalogLoader catalogLoader = CatalogLoader.hive("my_catalog", hiveConf, 
Maps.newHashMap());
     validateTableLoader(TableLoader.fromCatalog(catalogLoader, IDENTIFIER));
   }
 
-  private static void validateCatalogLoader(CatalogLoader loader)
-      throws IOException, ClassNotFoundException {
-    Table table = 
javaSerAndDeSer(loader).loadCatalog().createTable(IDENTIFIER, SCHEMA);
-    validateHadoopConf(table);
-  }
-
   private static void validateTableLoader(TableLoader loader)
       throws IOException, ClassNotFoundException {
-    TableLoader copied = javaSerAndDeSer(loader);
+    TableLoader copied = javaSerdes(loader);
     copied.open();
     try {
       validateHadoopConf(copied.loadTable());
@@ -120,7 +101,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> T javaSerAndDeSer(T object) throws IOException, 
ClassNotFoundException {
+  private static <T> T javaSerdes(T object) throws IOException, 
ClassNotFoundException {
     ByteArrayOutputStream bytes = new ByteArrayOutputStream();
     try (ObjectOutputStream out = new ObjectOutputStream(bytes)) {
       out.writeObject(object);

Reply via email to