This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 936cb022b [flink] Introduce FlinkGenericCatalog (#1857)
936cb022b is described below
commit 936cb022bc73b2ce1f057e29702fe6f4b1db44e9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 22 11:38:52 2023 +0800
[flink] Introduce FlinkGenericCatalog (#1857)
---
docs/content/engines/flink.md | 35 ++
docs/content/engines/spark3.md | 3 +
docs/content/how-to/creating-catalogs.md | 4 +
.../java/org/apache/paimon/flink/FlinkCatalog.java | 15 +-
.../apache/paimon/flink/FlinkGenericCatalog.java | 397 +++++++++++++++++++++
.../paimon/flink/FlinkGenericCatalogFactory.java | 105 ++++++
.../paimon/flink/FlinkGenericTableFactory.java | 79 ++++
.../org.apache.flink.table.factories.Factory | 1 +
.../org/apache/paimon/flink/FlinkCatalogTest.java | 7 +-
paimon-hive/paimon-hive-connector-common/pom.xml | 14 +
.../paimon/hive/FlinkGenericCatalogITCase.java | 120 +++++++
11 files changed, 767 insertions(+), 13 deletions(-)
diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index a3eccfcf7..3cd22e9ba 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -125,6 +125,10 @@ You can now start Flink SQL client to execute SQL scripts.
**Step 5: Create a Catalog and a Table**
+{{< tabs "Create Flink Catalog" >}}
+
+{{< tab "Catalog" >}}
+
```sql
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or
OSS
@@ -142,6 +146,37 @@ CREATE TABLE word_count (
);
```
+{{< /tab >}}
+
+{{< tab "Generic-Catalog" >}}
+
+Using FlinkGenericCatalog, you need to use Hive metastore. Then, you can use
all the tables from Paimon, Hive, and
+Flink Generic Tables (Kafka and other tables)!
+
+In this mode, you should use 'connector' option for creating tables.
+
+```sql
+CREATE CATALOG my_catalog WITH (
+ 'type'='paimon-generic',
+ 'hive-conf-dir'='...',
+ 'hadoop-conf-dir'='...'
+);
+
+USE CATALOG my_catalog;
+
+-- create a word count table
+CREATE TABLE word_count (
+ word STRING PRIMARY KEY NOT ENFORCED,
+ cnt BIGINT
+) WITH (
+ 'connector'='paimon'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
**Step 6: Write Data**
```sql
diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index 548661330..9273033de 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -120,6 +120,9 @@ can use the `spark_catalog.${database_name}.${table_name}`
to access Spark table
When starting `spark-sql`, use the following command to register Paimon’s
Spark Generic catalog to replace Spark
default catalog `spark_catalog`. (default warehouse is Spark
`spark.sql.warehouse.dir`)
+Currently, it is only recommended to use `SparkGenericCatalog` in the case of
Hive metastore, Paimon will infer
+Hive conf from Spark session, you just need to configure Spark's Hive conf.
+
```bash
spark-sql ... \
--conf
spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
diff --git a/docs/content/how-to/creating-catalogs.md
b/docs/content/how-to/creating-catalogs.md
index 196d3d1de..05d36aef8 100644
--- a/docs/content/how-to/creating-catalogs.md
+++ b/docs/content/how-to/creating-catalogs.md
@@ -106,6 +106,8 @@ USE CATALOG my_hive;
You can define any default table options with the prefix `table-default.` for
tables created in the catalog.
+Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).
+
{{< /tab >}}
{{< tab "Spark3" >}}
@@ -130,6 +132,8 @@ After `spark-sql` is started, you can switch to the
`default` database of the `p
USE paimon.default;
```
+Also, you can create [SparkGenericCatalog]({{< ref "engines/spark3" >}}).
+
{{< /tab >}}
{{< /tabs >}}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 6554fddeb..23cf7a9ba 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
@@ -113,8 +114,6 @@ public class FlinkCatalog extends AbstractCatalog {
private final Duration logStoreAutoRegisterTimeout;
- private final Options options;
-
public FlinkCatalog(
Catalog catalog,
String name,
@@ -124,7 +123,6 @@ public class FlinkCatalog extends AbstractCatalog {
super(name, defaultDatabase);
this.catalog = catalog;
this.classLoader = classLoader;
- this.options = options;
this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
try {
@@ -255,11 +253,14 @@ public class FlinkCatalog extends AbstractCatalog {
}
CatalogTable catalogTable = (CatalogTable) table;
Map<String, String> options = table.getOptions();
- if (options.containsKey(CONNECTOR.key())) {
+ String connector = options.get(CONNECTOR.key());
+ options.remove(CONNECTOR.key());
+ if (!StringUtils.isNullOrWhitespaceOnly(connector)
+ && !FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
throw new CatalogException(
- "Paimon Catalog only supports paimon tables ,"
- + " and you don't need to specify 'connector'= '"
- + FlinkCatalogFactory.IDENTIFIER
+ "Paimon Catalog only supports paimon tables,"
+ + " but you specify 'connector'= '"
+ + connector
+ "' when using Paimon Catalog\n"
+ " You can create TEMPORARY table instead if you
want to create the table of other connector.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
new file mode 100644
index 000000000..2a3962223
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
+import org.apache.flink.table.factories.TableFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** A Flink catalog that can also load non-Paimon tables. */
+public class FlinkGenericCatalog extends AbstractCatalog {
+
+ private final FlinkCatalog paimon;
+ private final Catalog flink;
+
+ public FlinkGenericCatalog(FlinkCatalog paimon, Catalog flink) {
+ super(paimon.getName(), paimon.getDefaultDatabase());
+ this.paimon = paimon;
+ this.flink = flink;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ paimon.open();
+ flink.open();
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ paimon.close();
+ flink.close();
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(
+ new FlinkGenericTableFactory(paimon.getFactory().get(),
flink.getFactory().get()));
+ }
+
+ @Override
+ public Optional<TableFactory> getTableFactory() {
+ return flink.getTableFactory();
+ }
+
+ @Override
+ public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+ return flink.getFunctionDefinitionFactory();
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return flink.listDatabases();
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ return flink.getDatabase(databaseName);
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return flink.databaseExists(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ flink.createDatabase(name, database, ignoreIfExists);
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ flink.dropDatabase(name, ignoreIfNotExists, cascade);
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ flink.alterDatabase(name, newDatabase, ignoreIfNotExists);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ return flink.listTables(databaseName);
+ }
+
+ @Override
+ public List<String> listViews(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ return flink.listViews(databaseName);
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ try {
+ return paimon.getTable(tablePath);
+ } catch (TableNotExistException e) {
+ return flink.getTable(tablePath);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return flink.tableExists(tablePath);
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ paimon.dropTable(tablePath, false);
+ } catch (TableNotExistException e) {
+ flink.dropTable(tablePath, ignoreIfNotExists);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException,
CatalogException {
+ try {
+ paimon.renameTable(tablePath, newTableName, false);
+ } catch (TableNotExistException e) {
+ flink.renameTable(tablePath, newTableName, ignoreIfNotExists);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ String connector = table.getOptions().get(CONNECTOR.key());
+ if (FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
+ paimon.createTable(tablePath, table, ignoreIfExists);
+ } else {
+ flink.createTable(tablePath, table, ignoreIfExists);
+ }
+ }
+
+ private boolean isPaimonTable(ObjectPath tablePath) {
+ try {
+ paimon.getTable(tablePath);
+ return true;
+ } catch (TableNotExistException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (isPaimonTable(tablePath)) {
+ paimon.alterTable(tablePath, newTable, ignoreIfNotExists);
+ } else {
+ flink.alterTable(tablePath, newTable, ignoreIfNotExists);
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath,
+ CatalogBaseTable newTable,
+ List<TableChange> tableChanges,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (isPaimonTable(tablePath)) {
+ paimon.alterTable(tablePath, newTable, tableChanges,
ignoreIfNotExists);
+ } else {
+ flink.alterTable(tablePath, newTable, tableChanges,
ignoreIfNotExists);
+ }
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return flink.listPartitions(tablePath);
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, CatalogException {
+ return flink.listPartitions(tablePath, partitionSpec);
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> filters)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return flink.listPartitionsByFilter(tablePath, filters);
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return flink.getPartition(tablePath, partitionSpec);
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec)
+ throws CatalogException {
+ return flink.partitionExists(tablePath, partitionSpec);
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException,
PartitionAlreadyExistsException,
+ CatalogException {
+ flink.createPartition(tablePath, partitionSpec, partition,
ignoreIfExists);
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean
ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ flink.dropPartition(tablePath, partitionSpec, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ flink.alterPartition(tablePath, partitionSpec, newPartition,
ignoreIfNotExists);
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName)
+ throws DatabaseNotExistException, CatalogException {
+ return flink.listFunctions(dbName);
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ return flink.getFunction(functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ return flink.functionExists(functionPath);
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean
ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ flink.createFunction(functionPath, function, ignoreIfExists);
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ flink.alterFunction(functionPath, newFunction, ignoreIfNotExists);
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ flink.dropFunction(functionPath, ignoreIfNotExists);
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return flink.getTableStatistics(tablePath);
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
tablePath)
+ throws TableNotExistException, CatalogException {
+ return flink.getTableColumnStatistics(tablePath);
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return flink.getPartitionStatistics(tablePath, partitionSpec);
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return flink.getPartitionColumnStatistics(tablePath, partitionSpec);
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ flink.alterTableStatistics(tablePath, tableStatistics,
ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException,
TablePartitionedException {
+ flink.alterTableColumnStatistics(tablePath, columnStatistics,
ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ flink.alterPartitionStatistics(
+ tablePath, partitionSpec, partitionStatistics,
ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ flink.alterPartitionColumnStatistics(
+ tablePath, partitionSpec, columnStatistics, ignoreIfNotExists);
+ }
+
+ @Override
+ public List<CatalogTableStatistics> bulkGetPartitionStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+ return flink.bulkGetPartitionStatistics(tablePath, partitionSpecs);
+ }
+
+ @Override
+ public List<CatalogColumnStatistics> bulkGetPartitionColumnStatistics(
+ ObjectPath tablePath, List<CatalogPartitionSpec> partitionSpecs)
+ throws PartitionNotExistException, CatalogException {
+ return flink.bulkGetPartitionColumnStatistics(tablePath,
partitionSpecs);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
new file mode 100644
index 000000000..e32997000
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.flink.FlinkCatalogOptions.DEFAULT_DATABASE;
+
+/** Factory for {@link FlinkGenericCatalog}. */
+public class FlinkGenericCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "paimon-generic";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<org.apache.flink.configuration.ConfigOption<?>>
requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<org.apache.flink.configuration.ConfigOption<?>>
optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public FlinkGenericCatalog createCatalog(Context context) {
+ CatalogFactory hiveFactory =
createHiveCatalogFactory(context.getClassLoader());
+ Catalog catalog = hiveFactory.createCatalog(context);
+ return createCatalog(
+ context.getClassLoader(), context.getOptions(),
context.getName(), catalog);
+ }
+
+ @VisibleForTesting
+ public static FlinkGenericCatalog createCatalog(
+ ClassLoader cl, Map<String, String> optionMap, String name,
Catalog flinkCatalog) {
+ String warehouse = extractWarehouse(flinkCatalog);
+ Options options = Options.fromMap(optionMap);
+ options.set(CatalogOptions.WAREHOUSE, warehouse);
+ options.set(CatalogOptions.METASTORE, "hive");
+ FlinkCatalog paimon =
+ new FlinkCatalog(
+ org.apache.paimon.catalog.CatalogFactory.createCatalog(
+ CatalogContext.create(options, new
FlinkFileIOLoader()), cl),
+ name,
+ options.get(DEFAULT_DATABASE),
+ cl,
+ options);
+
+ return new FlinkGenericCatalog(paimon, flinkCatalog);
+ }
+
+ private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) {
+ return FactoryUtil.discoverFactory(cl, CatalogFactory.class, "hive");
+ }
+
+ private static String extractWarehouse(Catalog catalog) {
+ try {
+ Field field = catalog.getClass().getDeclaredField("hiveConf");
+ field.setAccessible(true);
+ Object hiveConf = field.get(catalog);
+
+ Method method = hiveConf.getClass().getMethod("get", String.class);
+ return (String) method.invoke(hiveConf,
"hive.metastore.warehouse.dir");
+ } catch (NoSuchFieldException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
new file mode 100644
index 000000000..2d25a3e3c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericTableFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Set;
+
+/** A table factory to wrap paimon and flink factories. */
+public class FlinkGenericTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ private final Factory paimon;
+ private final Factory flink;
+
+ public FlinkGenericTableFactory(Factory paimon, Factory flink) {
+ this.paimon = paimon;
+ this.flink = flink;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ throw new UnsupportedOperationException("Generic factory is only work
for catalog.");
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ throw new UnsupportedOperationException("Generic factory is only work
for catalog.");
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ throw new UnsupportedOperationException("Generic factory is only work
for catalog.");
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ boolean isFlink =
+
context.getCatalogTable().getOptions().containsKey(FactoryUtil.CONNECTOR.key());
+ if (isFlink) {
+ return ((DynamicTableSinkFactory)
flink).createDynamicTableSink(context);
+ } else {
+ return ((DynamicTableSinkFactory)
paimon).createDynamicTableSink(context);
+ }
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ boolean isFlink =
+
context.getCatalogTable().getOptions().containsKey(FactoryUtil.CONNECTOR.key());
+ if (isFlink) {
+ return ((DynamicTableSourceFactory)
flink).createDynamicTableSource(context);
+ } else {
+ return ((DynamicTableSourceFactory)
paimon).createDynamicTableSource(context);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a981b82fb..8936b5fbf 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
org.apache.paimon.flink.FlinkTableFactory
org.apache.paimon.flink.FlinkCatalogFactory
+org.apache.paimon.flink.FlinkGenericCatalogFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 16b2b5f17..2bd0efa21 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -212,12 +212,7 @@ public class FlinkCatalogTest {
assertThatThrownBy(() -> catalog.createTable(this.path1, newTable,
false))
.isInstanceOf(CatalogException.class)
- .hasMessageContaining(
- "Paimon Catalog only supports paimon tables ,"
- + " and you don't need to specify
'connector'= '"
- + FlinkCatalogFactory.IDENTIFIER
- + "' when using Paimon Catalog\n"
- + " You can create TEMPORARY table instead if
you want to create the table of other connector.");
+ .hasMessageContaining("Paimon Catalog only supports paimon
tables");
}
@ParameterizedTest
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 8280f37d2..baba58dae 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -107,6 +107,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-catalog</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
@@ -114,6 +121,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+ <version>${test.flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
new file mode 100644
index 000000000..9f319eae2
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.flink.FlinkGenericCatalog;
+import org.apache.paimon.flink.FlinkGenericCatalogFactory;
+import org.apache.paimon.hive.annotation.Minio;
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.s3.MinioTestContainer;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for using Flink {@code FlinkGenericCatalog}. */
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public class FlinkGenericCatalogITCase extends AbstractTestBase {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ protected TableEnvironment tEnv;
+
+ @HiveSQL(files = {})
+ protected static HiveShell hiveShell;
+
+ @Minio private static MinioTestContainer minioTestContainer;
+
+ private static HiveCatalog createHiveCatalog(HiveConf hiveConf) {
+ return new HiveCatalog(
+ "testcatalog", null, hiveConf,
HiveShimLoader.getHiveVersion(), true);
+ }
+
+ @Before
+ public void before() throws Exception {
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+ hiveShell.execute("USE test_db");
+ hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+ hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200,
'Table')");
+ hiveShell.executeQuery("SHOW TABLES");
+
+ tEnv =
TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ HiveCatalog hiveCatalog = createHiveCatalog(hiveShell.getHiveConf());
+ FlinkGenericCatalog catalog =
+ FlinkGenericCatalogFactory.createCatalog(
+ this.getClass().getClassLoader(),
+ new HashMap<>(),
+ hiveCatalog.getName(),
+ hiveCatalog);
+ catalog.open();
+ tEnv.registerCatalog(hiveCatalog.getName(), catalog);
+ sql("USE CATALOG " + hiveCatalog.getName());
+ sql("USE test_db");
+ }
+
+ @After
+ public void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testPaimonTableToBlackHole() {
+ sql(
+ "CREATE TABLE paimon_t ( "
+ + "f0 INT, "
+ + "f1 INT "
+ + ") WITH ('connector'='paimon', 'file.format' =
'avro' )");
+ sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)");
+ assertThat(sql("SELECT * FROM paimon_t"))
+ .containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2));
+
+ sql("CREATE TABLE bh (f0 INT, f1 INT) WITH ('connector'='blackhole')");
+ sql("INSERT INTO bh SELECT * FROM paimon_t");
+ }
+}