This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 6fc6a5a7366fa3dcd0b9e3071b058e75aff4181d Author: Kerwin <[email protected]> AuthorDate: Mon Jan 8 14:36:32 2024 +0800 [flink] Add paimon procedures in flink generic catalog (#2651) --- .../apache/paimon/flink/FlinkGenericCatalog.java | 24 ++++++++++++++++++++++ .../paimon/hive/FlinkGenericCatalogITCase.java | 8 ++++++++ 2 files changed, 32 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java index 1d1b4b898..f6c206a7f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.procedure.ProcedureUtil; + import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -36,6 +38,7 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; @@ -46,6 +49,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.procedures.Procedure; import java.util.List; import java.util.Optional; @@ -477,4 +481,24 @@ public class FlinkGenericCatalog extends AbstractCatalog { return flink.bulkGetPartitionColumnStatistics(tablePath, partitionSpecs); } } + + /** + * Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-. + */ + public List<String> listProcedures(String dbName) + throws DatabaseNotExistException, CatalogException { + if (paimon.databaseExists(dbName)) { + return ProcedureUtil.listProcedures(); + } + return flink.listProcedures(dbName); + } + + /** + * Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.17-. + */ + public Procedure getProcedure(ObjectPath procedurePath) + throws ProcedureNotExistException, CatalogException { + return ProcedureUtil.getProcedure(paimon.catalog(), procedurePath) + .orElse(flink.getProcedure(procedurePath)); + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java index 10fd291fc..d93a99422 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FlinkGenericCatalogITCase.java @@ -137,4 +137,12 @@ public class FlinkGenericCatalogITCase extends AbstractTestBase { assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); } + + @Test + public void testReadPaimonAllProcedures() { + List<Row> result = sql("SHOW PROCEDURES"); + + assertThat(result) + .contains(Row.of("compact"), Row.of("merge_into"), Row.of("migrate_table")); + } }
