This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 6fc6a5a7366fa3dcd0b9e3071b058e75aff4181d
Author: Kerwin <[email protected]>
AuthorDate: Mon Jan 8 14:36:32 2024 +0800

    [flink] Add paimon procedures in flink generic catalog (#2651)
---
 .../apache/paimon/flink/FlinkGenericCatalog.java   | 24 ++++++++++++++++++++++
 .../paimon/hive/FlinkGenericCatalogITCase.java     |  8 ++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
index 1d1b4b898..f6c206a7f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.flink.procedure.ProcedureUtil;
+
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -36,6 +38,7 @@ import 
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
@@ -46,6 +49,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.procedures.Procedure;
 
 import java.util.List;
 import java.util.Optional;
@@ -477,4 +481,24 @@ public class FlinkGenericCatalog extends AbstractCatalog {
             return flink.bulkGetPartitionColumnStatistics(tablePath, 
partitionSpecs);
         }
     }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.17-.
+     */
+    public List<String> listProcedures(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        if (paimon.databaseExists(dbName)) {
+            return ProcedureUtil.listProcedures();
+        }
+        return flink.listProcedures(dbName);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.17-.
+     */
+    public Procedure getProcedure(ObjectPath procedurePath)
+            throws ProcedureNotExistException, CatalogException {
+        return ProcedureUtil.getProcedure(paimon.catalog(), procedurePath)
+                .orElse(flink.getProcedure(procedurePath));
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
index 10fd291fc..d93a99422 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java
@@ -137,4 +137,12 @@ public class FlinkGenericCatalogITCase extends 
AbstractTestBase {
 
         assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), 
Row.of(2L, 0L, "APPEND"));
     }
+
+    @Test
+    public void testReadPaimonAllProcedures() {
+        List<Row> result = sql("SHOW PROCEDURES");
+
+        assertThat(result)
+                .contains(Row.of("compact"), Row.of("merge_into"), 
Row.of("migrate_table"));
+    }
 }

Reply via email to