This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c73bac333 [flink] Initialize Flink 1.18 to support Call Procedure 
(#1992)
c73bac333 is described below

commit c73bac333bd4fc2ecad2b01c1cf6791f911d404f
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 12 22:44:21 2023 +0800

    [flink] Initialize Flink 1.18 to support Call Procedure (#1992)
---
 .github/workflows/utitcase-flink.yml               |  2 +-
 .../generated/flink_connector_configuration.html   |  2 +-
 paimon-flink/paimon-flink-common/pom.xml           |  2 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 26 +++++++++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  5 ++-
 .../paimon/flink/procedure/ProcedureUtil.java      | 45 ++++++++++++++++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 39 ++++++++++++++++++-
 7 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/utitcase-flink.yml 
b/.github/workflows/utitcase-flink.yml
index c346f6fe4..87e767a28 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -44,7 +44,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build and Test
-        timeout-minutes: 80
+        timeout-minutes: 60
         run: |
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 876735367..455c84b62 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -78,7 +78,7 @@ under the License.
             <td><h5>scan.push-down</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
-            <td>If true, flink will push down projection, filters, limit to 
the source. The cost is that it is difficult to reuse the source in a job.</td>
+            <td>If true, flink will push down projection, filters, limit to 
the source. The cost is that it is difficult to reuse the source in a job. With 
flink 1.18 or higher version, it is possible to reuse the source even with 
projection push down.</td>
         </tr>
         <tr>
             <td><h5>scan.remove-normalize</h5></td>
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 87b985005..8e6f0e840 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>Paimon : Flink : Common</name>
 
     <properties>
-        <flink.version>1.17.1</flink.version>
+        <flink.version>1.18-SNAPSHOT</flink.version>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
     </properties>
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index c52b75984..3aed103eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -62,6 +63,7 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -69,6 +71,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.procedures.Procedure;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -111,6 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
     private final ClassLoader classLoader;
 
     private final Catalog catalog;
+    private final String name;
     private final boolean logStoreAutoRegister;
 
     private final Duration logStoreAutoRegisterTimeout;
@@ -125,6 +129,7 @@ public class FlinkCatalog extends AbstractCatalog {
             Options options) {
         super(name, defaultDatabase);
         this.catalog = catalog;
+        this.name = name;
         this.classLoader = classLoader;
         this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
         this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
@@ -866,4 +871,25 @@ public class FlinkCatalog extends AbstractCatalog {
             throws CatalogException {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.17-.
+     */
+    public List<String> listProcedures(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        if (!databaseExists(dbName)) {
+            throw new DatabaseNotExistException(name, dbName);
+        }
+
+        return ProcedureUtil.listProcedures();
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.17-.
+     */
+    public Procedure getProcedure(ObjectPath procedurePath)
+            throws ProcedureNotExistException, CatalogException {
+        return ProcedureUtil.getProcedure(procedurePath.getObjectName())
+                .orElseThrow(() -> new ProcedureNotExistException(name, 
procedurePath));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e8f46f440..482696c59 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -230,8 +230,9 @@ public class FlinkConnectorOptions {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription(
-                            "If true, flink will push down projection, 
filters, limit to the source. "
-                                    + "The cost is that it is difficult to 
reuse the source in a job.");
+                            "If true, flink will push down projection, 
filters, limit to the source. The cost is that it "
+                                    + "is difficult to reuse the source in a 
job. With flink 1.18 or higher version, it "
+                                    + "is possible to reuse the source even 
with projection push down.");
 
     public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
             ConfigOptions.key("source.checkpoint-align.enabled")
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
new file mode 100644
index 000000000..8e9a02435
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.flink.table.procedures.Procedure;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility methods for {@link Procedure}. */
+public class ProcedureUtil {
+
+    private ProcedureUtil() {}
+
+    private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
+    private static final Map<String, Procedure> SYSTEM_PROCEDURES_MAP = new 
HashMap<>();
+
+    public static List<String> listProcedures() {
+        return Collections.unmodifiableList(SYSTEM_PROCEDURES);
+    }
+
+    public static Optional<Procedure> getProcedure(String procedureName) {
+        return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 2003d7c59..fb590f539 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -74,7 +74,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @TestTemplate
-    public void testSourceReuse() {
+    public void testSourceReuseWithoutScanPushDown() {
         sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH 
('connector'='print')");
         sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH 
('connector'='print')");
 
@@ -86,8 +86,45 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(statementSet.compilePlan().explain()).contains("Reused");
 
         statementSet = sEnv.createStatementSet();
+        statementSet.addInsertSql(
+                "INSERT INTO print1 SELECT a FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'");
+        statementSet.addInsertSql(
+                "INSERT INTO print2 SELECT b FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'");
+        assertThat(statementSet.compilePlan().explain()).contains("Reused");
+
+        statementSet = sEnv.createStatementSet();
+        statementSet.addInsertSql(
+                "INSERT INTO print1 SELECT a FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5");
+        statementSet.addInsertSql(
+                "INSERT INTO print2 SELECT b FROM T1 /*+ 
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10");
+        assertThat(statementSet.compilePlan().explain()).contains("Reused");
+    }
+
+    @TestTemplate
+    public void testSourceReuseWithScanPushDown() {
+        // source can be reused with projection applied
+        sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH 
('connector'='print')");
+        sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH 
('connector'='print')");
+
+        StatementSet statementSet = sEnv.createStatementSet();
         statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
         statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
+        assertThat(statementSet.compilePlan().explain()).contains("Reused");
+
+        // source cannot be reused with filter or limit applied
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE new_print1 (a STRING, b STRING, c 
STRING) WITH ('connector'='print')");
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE new_print2 (a STRING, b STRING, c 
STRING) WITH ('connector'='print')");
+
+        statementSet = sEnv.createStatementSet();
+        statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 
WHERE a = 'Apache'");
+        statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
+        
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
+
+        statementSet = sEnv.createStatementSet();
+        statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 
LIMIT 5");
+        statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
         
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
     }
 

Reply via email to