This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 936cb022b [flink] Introduce FlinkGenericCatalog (#1857)
936cb022b is described below

commit 936cb022bc73b2ce1f057e29702fe6f4b1db44e9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 22 11:38:52 2023 +0800

    [flink] Introduce FlinkGenericCatalog (#1857)
---
 docs/content/engines/flink.md                      |  35 ++
 docs/content/engines/spark3.md                     |   3 +
 docs/content/how-to/creating-catalogs.md           |   4 +
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  15 +-
 .../apache/paimon/flink/FlinkGenericCatalog.java   | 397 +++++++++++++++++++++
 .../paimon/flink/FlinkGenericCatalogFactory.java   | 105 ++++++
 .../paimon/flink/FlinkGenericTableFactory.java     |  79 ++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |   7 +-
 paimon-hive/paimon-hive-connector-common/pom.xml   |  14 +
 .../paimon/hive/FlinkGenericCatalogITCase.java     | 120 +++++++
 11 files changed, 767 insertions(+), 13 deletions(-)

diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index a3eccfcf7..3cd22e9ba 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -125,6 +125,10 @@ You can now start Flink SQL client to execute SQL scripts.
 
 **Step 5: Create a Catalog and a Table**
 
+{{< tabs "Create Flink Catalog" >}}
+
+{{< tab "Catalog" >}}
+
 ```sql
 -- if you're trying out Paimon in a distributed environment,
 -- the warehouse path should be set to a shared file system, such as HDFS or 
OSS
@@ -142,6 +146,37 @@ CREATE TABLE word_count (
 );
 ```
 
+{{< /tab >}}
+
+{{< tab "Generic-Catalog" >}}
+
+Using FlinkGenericCatalog, you need to use Hive metastore. Then, you can use 
all the tables from Paimon, Hive, and
+Flink Generic Tables (Kafka and other tables)!
+
+In this mode, you should use 'connector' option for creating tables.
+
+```sql
+CREATE CATALOG my_catalog WITH (
+    'type'='paimon-generic',
+    'hive-conf-dir'='...',
+    'hadoop-conf-dir'='...'
+);
+
+USE CATALOG my_catalog;
+
+-- create a word count table
+CREATE TABLE word_count (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt BIGINT
+) WITH (
+    'connector'='paimon'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 **Step 6: Write Data**
 
 ```sql
diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index 548661330..9273033de 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -120,6 +120,9 @@ can use the `spark_catalog.${database_name}.${table_name}` 
to access Spark table
 When starting `spark-sql`, use the following command to register Paimon’s 
Spark Generic catalog to replace Spark
 default catalog `spark_catalog`. (default warehouse is Spark 
`spark.sql.warehouse.dir`)
 
+Currently, it is only recommended to use `SparkGenericCatalog` in the case of 
Hive metastore, Paimon will infer
+Hive conf from Spark session, you just need to configure Spark's Hive conf.
+
 ```bash
 spark-sql ... \
     --conf 
spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
diff --git a/docs/content/how-to/creating-catalogs.md 
b/docs/content/how-to/creating-catalogs.md
index 196d3d1de..05d36aef8 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -106,6 +106,8 @@ USE CATALOG my_hive;
 
 You can define any default table options with the prefix `table-default.` for 
tables created in the catalog.
 
+Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).
+
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
@@ -130,6 +132,8 @@ After `spark-sql` is started, you can switch to the 
`default` database of the `p
 USE paimon.default;
 ```
 
+Also, you can create [SparkGenericCatalog]({{< ref "engines/spark3" >}}).
+
 {{< /tab >}}
 
 {{< /tabs >}}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 6554fddeb..23cf7a9ba 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
@@ -113,8 +114,6 @@ public class FlinkCatalog extends AbstractCatalog {
 
     private final Duration logStoreAutoRegisterTimeout;
 
-    private final Options options;
-
     public FlinkCatalog(
             Catalog catalog,
             String name,
@@ -124,7 +123,6 @@ public class FlinkCatalog extends AbstractCatalog {
         super(name, defaultDatabase);
         this.catalog = catalog;
         this.classLoader = classLoader;
-        this.options = options;
         this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
         this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
         try {
@@ -255,11 +253,14 @@ public class FlinkCatalog extends AbstractCatalog {
         }
         CatalogTable catalogTable = (CatalogTable) table;
         Map<String, String> options = table.getOptions();
-        if (options.containsKey(CONNECTOR.key())) {
+        String connector = options.get(CONNECTOR.key());
+        options.remove(CONNECTOR.key());
+        if (!StringUtils.isNullOrWhitespaceOnly(connector)
+                && !FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
             throw new CatalogException(
-                    "Paimon Catalog only supports paimon tables ,"
-                            + " and you don't need to specify  'connector'= '"
-                            + FlinkCatalogFactory.IDENTIFIER
+                    "Paimon Catalog only supports paimon tables,"
+                            + " but you specify  'connector'= '"
+                            + connector
                             + "' when using Paimon Catalog\n"
                             + " You can create TEMPORARY table instead if you 
want to create the table of other connector.");
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
new file mode 100644
index 000000000..2a3962223
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** A Flink catalog that can also load non-Paimon tables. */
+public class FlinkGenericCatalog extends AbstractCatalog {
+
+    private final FlinkCatalog paimon;
+    private final Catalog flink;
+
+    public FlinkGenericCatalog(FlinkCatalog paimon, Catalog flink) {
+        super(paimon.getName(), paimon.getDefaultDatabase());
+        this.paimon = paimon;
+        this.flink = flink;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        paimon.open();
+        flink.open();
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        paimon.close();
+        flink.close();
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(
+                new FlinkGenericTableFactory(paimon.getFactory().get(), 
flink.getFactory().get()));
+    }
+
+    @Override
+    public Optional<TableFactory> getTableFactory() {
+        return flink.getTableFactory();
+    }
+
+    @Override
+    public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+        return flink.getFunctionDefinitionFactory();
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return flink.listDatabases();
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        return flink.getDatabase(databaseName);
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return flink.databaseExists(databaseName);
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        flink.createDatabase(name, database, ignoreIfExists);
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        flink.dropDatabase(name, ignoreIfNotExists, cascade);
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        flink.alterDatabase(name, newDatabase, ignoreIfNotExists);
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        return flink.listTables(databaseName);
+    }
+
+    @Override
+    public List<String> listViews(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        return flink.listViews(databaseName);
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        try {
+            return paimon.getTable(tablePath);
+        } catch (TableNotExistException e) {
+            return flink.getTable(tablePath);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return flink.tableExists(tablePath);
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            paimon.dropTable(tablePath, false);
+        } catch (TableNotExistException e) {
+            flink.dropTable(tablePath, ignoreIfNotExists);
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+        try {
+            paimon.renameTable(tablePath, newTableName, false);
+        } catch (TableNotExistException e) {
+            flink.renameTable(tablePath, newTableName, ignoreIfNotExists);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        String connector = table.getOptions().get(CONNECTOR.key());
+        if (FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
+            paimon.createTable(tablePath, table, ignoreIfExists);
+        } else {
+            flink.createTable(tablePath, table, ignoreIfExists);
+        }
+    }
+
+    private boolean isPaimonTable(ObjectPath tablePath) {
+        try {
+            paimon.getTable(tablePath);
+            return true;
+        } catch (TableNotExistException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (isPaimonTable(tablePath)) {
+            paimon.alterTable(tablePath, newTable, ignoreIfNotExists);
+        } else {
+            flink.alterTable(tablePath, newTable, ignoreIfNotExists);
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath,
+            CatalogBaseTable newTable,
+            List<TableChange> tableChanges,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (isPaimonTable(tablePath)) {
+            paimon.alterTable(tablePath, newTable, tableChanges, 
ignoreIfNotExists);
+        } else {
+            flink.alterTable(tablePath, newTable, tableChanges, 
ignoreIfNotExists);
+        }
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return flink.listPartitions(tablePath);
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, CatalogException {
+        return flink.listPartitions(tablePath, partitionSpec);
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return flink.listPartitionsByFilter(tablePath, filters);
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return flink.getPartition(tablePath, partitionSpec);
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException {
+        return flink.partitionExists(tablePath, partitionSpec);
+    }
+
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, 
PartitionAlreadyExistsException,
+                    CatalogException {
+        flink.createPartition(tablePath, partitionSpec, partition, 
ignoreIfExists);
+    }
+
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        flink.dropPartition(tablePath, partitionSpec, ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        flink.alterPartition(tablePath, partitionSpec, newPartition, 
ignoreIfNotExists);
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        return flink.listFunctions(dbName);
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        return flink.getFunction(functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+        return flink.functionExists(functionPath);
+    }
+
+    @Override
+    public void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+            throws FunctionAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        flink.createFunction(functionPath, function, ignoreIfExists);
+    }
+
+    @Override
+    public void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        flink.alterFunction(functionPath, newFunction, ignoreIfNotExists);
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        flink.dropFunction(functionPath, ignoreIfNotExists);
+    }
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return flink.getTableStatistics(tablePath);
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+            throws TableNotExistException, CatalogException {
+        return flink.getTableColumnStatistics(tablePath);
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return flink.getPartitionStatistics(tablePath, partitionSpec);
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return flink.getPartitionColumnStatistics(tablePath, partitionSpec);
+    }
+
+    @Override
+    public void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, 
boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        flink.alterTableStatistics(tablePath, tableStatistics, 
ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterTableColumnStatistics(
+            ObjectPath tablePath,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException, 
TablePartitionedException {
+        flink.alterTableColumnStatistics(tablePath, columnStatistics, 
ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        flink.alterPartitionStatistics(
+                tablePath, partitionSpec, partitionStatistics, 
ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        flink.alterPartitionColumnStatistics(
+                tablePath, partitionSpec, columnStatistics, ignoreIfNotExists);
+    }
+
+    @Override
+    public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+            ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+            throws PartitionNotExistException, CatalogException {
+        return flink.bulkGetPartitionStatistics(tablePath, partitionSpecs);
+    }
+
+    @Override
+    public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+            ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+            throws PartitionNotExistException, CatalogException {
+        return flink.bulkGetPartitionColumnStatistics(tablePath, 
partitionSpecs);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
new file mode 100644
index 000000000..e32997000
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
+
+/** Factory for {@link FlinkGenericCatalog}. */
+public class FlinkGenericCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "paimon-generic";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<org.apache.flink.configuration.ConfigOption<?>> 
requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<org.apache.flink.configuration.ConfigOption<?>> 
optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public FlinkGenericCatalog createCatalog(Context context) {
+        CatalogFactory hiveFactory = 
createHiveCatalogFactory(context.getClassLoader());
+        Catalog catalog = hiveFactory.createCatalog(context);
+        return createCatalog(
+                context.getClassLoader(), context.getOptions(), 
context.getName(), catalog);
+    }
+
+    @VisibleForTesting
+    public static FlinkGenericCatalog createCatalog(
+            ClassLoader cl, Map<String, String> optionMap, String name, 
Catalog flinkCatalog) {
+        String warehouse = extractWarehouse(flinkCatalog);
+        Options options = Options.fromMap(optionMap);
+        options.set(CatalogOptions.WAREHOUSE, warehouse);
+        options.set(CatalogOptions.METASTORE, "hive");
+        FlinkCatalog paimon =
+                new FlinkCatalog(
+                        org.apache.paimon.catalog.CatalogFactory.createCatalog(
+                                CatalogContext.create(options, new 
FlinkFileIOLoader()), cl),
+                        name,
+                        options.get(DEFAULT_DATABASE),
+                        cl,
+                        options);
+
+        return new FlinkGenericCatalog(paimon, flinkCatalog);
+    }
+
+    private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) {
+        return FactoryUtil.discoverFactory(cl, CatalogFactory.class, "hive");
+    }
+
+    private static String extractWarehouse(Catalog catalog) {
+        try {
+            Field field = catalog.getClass().getDeclaredField("hiveConf");
+            field.setAccessible(true);
+            Object hiveConf = field.get(catalog);
+
+            Method method = hiveConf.getClass().getMethod("get", String.class);
+            return (String) method.invoke(hiveConf, 
"hive.metastore.warehouse.dir");
+        } catch (NoSuchFieldException
+                | IllegalAccessException
+                | NoSuchMethodException
+                | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
new file mode 100644
index 000000000..2d25a3e3c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Set;
+
+/** A table factory to wrap paimon and flink factories. */
+public class FlinkGenericTableFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    private final Factory paimon;
+    private final Factory flink;
+
+    public FlinkGenericTableFactory(Factory paimon, Factory flink) {
+        this.paimon = paimon;
+        this.flink = flink;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        throw new UnsupportedOperationException("Generic factory is only work 
for catalog.");
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        throw new UnsupportedOperationException("Generic factory is only work 
for catalog.");
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        throw new UnsupportedOperationException("Generic factory is only work 
for catalog.");
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        boolean isFlink =
+                
context.getCatalogTable().getOptions().containsKey(FactoryUtil.CONNECTOR.key());
+        if (isFlink) {
+            return ((DynamicTableSinkFactory) 
flink).createDynamicTableSink(context);
+        } else {
+            return ((DynamicTableSinkFactory) 
paimon).createDynamicTableSink(context);
+        }
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        boolean isFlink =
+                
context.getCatalogTable().getOptions().containsKey(FactoryUtil.CONNECTOR.key());
+        if (isFlink) {
+            return ((DynamicTableSourceFactory) 
flink).createDynamicTableSource(context);
+        } else {
+            return ((DynamicTableSourceFactory) 
paimon).createDynamicTableSource(context);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a981b82fb..8936b5fbf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
 
 org.apache.paimon.flink.FlinkTableFactory
 org.apache.paimon.flink.FlinkCatalogFactory
+org.apache.paimon.flink.FlinkGenericCatalogFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 16b2b5f17..2bd0efa21 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -212,12 +212,7 @@ public class FlinkCatalogTest {
 
         assertThatThrownBy(() -> catalog.createTable(this.path1, newTable, 
false))
                 .isInstanceOf(CatalogException.class)
-                .hasMessageContaining(
-                        "Paimon Catalog only supports paimon tables ,"
-                                + " and you don't need to specify  
'connector'= '"
-                                + FlinkCatalogFactory.IDENTIFIER
-                                + "' when using Paimon Catalog\n"
-                                + " You can create TEMPORARY table instead if 
you want to create the table of other connector.");
+                .hasMessageContaining("Paimon Catalog only supports paimon 
tables");
     }
 
     @ParameterizedTest
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 8280f37d2..baba58dae 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -107,6 +107,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -114,6 +121,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+            <version>${test.flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-runtime</artifactId>
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
new file mode 100644
index 000000000..9f319eae2
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.flink.FlinkGenericCatalog;
+import org.apache.paimon.flink.FlinkGenericCatalogFactory;
+import org.apache.paimon.hive.annotation.Minio;
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.s3.MinioTestContainer;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for using Flink {@code FlinkGenericCatalog}. */
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public class FlinkGenericCatalogITCase extends AbstractTestBase {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    protected TableEnvironment tEnv;
+
+    @HiveSQL(files = {})
+    protected static HiveShell hiveShell;
+
+    @Minio private static MinioTestContainer minioTestContainer;
+
+    private static HiveCatalog createHiveCatalog(HiveConf hiveConf) {
+        return new HiveCatalog(
+                "testcatalog", null, hiveConf, 
HiveShimLoader.getHiveVersion(), true);
+    }
+
+    @Before
+    public void before() throws Exception {
+        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+        hiveShell.execute("USE test_db");
+        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 
'Table')");
+        hiveShell.executeQuery("SHOW TABLES");
+
+        tEnv = 
TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        HiveCatalog hiveCatalog = createHiveCatalog(hiveShell.getHiveConf());
+        FlinkGenericCatalog catalog =
+                FlinkGenericCatalogFactory.createCatalog(
+                        this.getClass().getClassLoader(),
+                        new HashMap<>(),
+                        hiveCatalog.getName(),
+                        hiveCatalog);
+        catalog.open();
+        tEnv.registerCatalog(hiveCatalog.getName(), catalog);
+        sql("USE CATALOG " + hiveCatalog.getName());
+        sql("USE test_db");
+    }
+
+    @After
+    public void after() {
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
+    }
+
+    protected List<Row> sql(String query, Object... args) {
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testPaimonTableToBlackHole() {
+        sql(
+                "CREATE TABLE paimon_t ( "
+                        + "f0 INT, "
+                        + "f1 INT "
+                        + ") WITH ('connector'='paimon', 'file.format' = 
'avro' )");
+        sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)");
+        assertThat(sql("SELECT * FROM paimon_t"))
+                .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2));
+
+        sql("CREATE TABLE bh (f0 INT, f1 INT) WITH ('connector'='blackhole')");
+        sql("INSERT INTO bh SELECT * FROM paimon_t");
+    }
+}


Reply via email to