This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c73bac333 [flink] Initialize Flink 1.18 to support Call Procedure
(#1992)
c73bac333 is described below
commit c73bac333bd4fc2ecad2b01c1cf6791f911d404f
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 12 22:44:21 2023 +0800
[flink] Initialize Flink 1.18 to support Call Procedure (#1992)
---
.github/workflows/utitcase-flink.yml | 2 +-
.../generated/flink_connector_configuration.html | 2 +-
paimon-flink/paimon-flink-common/pom.xml | 2 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 26 +++++++++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 5 ++-
.../paimon/flink/procedure/ProcedureUtil.java | 45 ++++++++++++++++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 39 ++++++++++++++++++-
7 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/utitcase-flink.yml
b/.github/workflows/utitcase-flink.yml
index c346f6fe4..87e767a28 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -44,7 +44,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build and Test
- timeout-minutes: 80
+ timeout-minutes: 60
run: |
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 876735367..455c84b62 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -78,7 +78,7 @@ under the License.
<td><h5>scan.push-down</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
- <td>If true, flink will push down projection, filters, limit to
the source. The cost is that it is difficult to reuse the source in a job.</td>
+ <td>If true, flink will push down projection, filters, limit to
the source. The cost is that it is difficult to reuse the source in a job. With
flink 1.18 or higher version, it is possible to reuse the source even with
projection push down.</td>
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 87b985005..8e6f0e840 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>
<properties>
- <flink.version>1.17.1</flink.version>
+ <flink.version>1.18-SNAPSHOT</flink.version>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index c52b75984..3aed103eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -62,6 +63,7 @@ import
org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -69,6 +71,7 @@ import
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -111,6 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
private final ClassLoader classLoader;
private final Catalog catalog;
+ private final String name;
private final boolean logStoreAutoRegister;
private final Duration logStoreAutoRegisterTimeout;
@@ -125,6 +129,7 @@ public class FlinkCatalog extends AbstractCatalog {
Options options) {
super(name, defaultDatabase);
this.catalog = catalog;
+ this.name = name;
this.classLoader = classLoader;
this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
@@ -866,4 +871,25 @@ public class FlinkCatalog extends AbstractCatalog {
throws CatalogException {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.17-.
+ */
+ public List<String> listProcedures(String dbName)
+ throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(dbName)) {
+ throw new DatabaseNotExistException(name, dbName);
+ }
+
+ return ProcedureUtil.listProcedures();
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.17-.
+ */
+ public Procedure getProcedure(ObjectPath procedurePath)
+ throws ProcedureNotExistException, CatalogException {
+ return ProcedureUtil.getProcedure(procedurePath.getObjectName())
+ .orElseThrow(() -> new ProcedureNotExistException(name,
procedurePath));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e8f46f440..482696c59 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -230,8 +230,9 @@ public class FlinkConnectorOptions {
.booleanType()
.defaultValue(true)
.withDescription(
- "If true, flink will push down projection,
filters, limit to the source. "
- + "The cost is that it is difficult to
reuse the source in a job.");
+ "If true, flink will push down projection,
filters, limit to the source. The cost is that it "
+ + "is difficult to reuse the source in a
job. With flink 1.18 or higher version, it "
+ + "is possible to reuse the source even
with projection push down.");
public static final ConfigOption<Boolean> SOURCE_CHECKPOINT_ALIGN_ENABLED =
ConfigOptions.key("source.checkpoint-align.enabled")
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
new file mode 100644
index 000000000..8e9a02435
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.flink.table.procedures.Procedure;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility methods for {@link Procedure}. */
+public class ProcedureUtil {
+
+ private ProcedureUtil() {}
+
+ private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
+ private static final Map<String, Procedure> SYSTEM_PROCEDURES_MAP = new
HashMap<>();
+
+ public static List<String> listProcedures() {
+ return Collections.unmodifiableList(SYSTEM_PROCEDURES);
+ }
+
+ public static Optional<Procedure> getProcedure(String procedureName) {
+ return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 2003d7c59..fb590f539 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -74,7 +74,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@TestTemplate
- public void testSourceReuse() {
+ public void testSourceReuseWithoutScanPushDown() {
sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH
('connector'='print')");
sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH
('connector'='print')");
@@ -86,8 +86,45 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
assertThat(statementSet.compilePlan().explain()).contains("Reused");
statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql(
+ "INSERT INTO print1 SELECT a FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache'");
+ statementSet.addInsertSql(
+ "INSERT INTO print2 SELECT b FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon'");
+ assertThat(statementSet.compilePlan().explain()).contains("Reused");
+
+ statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql(
+ "INSERT INTO print1 SELECT a FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */ WHERE b = 'Apache' LIMIT 5");
+ statementSet.addInsertSql(
+ "INSERT INTO print2 SELECT b FROM T1 /*+
OPTIONS('scan.push-down' = 'false') */ WHERE a = 'Paimon' LIMIT 10");
+ assertThat(statementSet.compilePlan().explain()).contains("Reused");
+ }
+
+ @TestTemplate
+ public void testSourceReuseWithScanPushDown() {
+ // source can be reused with projection applied
+ sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH
('connector'='print')");
+ sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH
('connector'='print')");
+
+ StatementSet statementSet = sEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
+ assertThat(statementSet.compilePlan().explain()).contains("Reused");
+
+ // source cannot be reused with filter or limit applied
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE new_print1 (a STRING, b STRING, c
STRING) WITH ('connector'='print')");
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE new_print2 (a STRING, b STRING, c
STRING) WITH ('connector'='print')");
+
+ statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1
WHERE a = 'Apache'");
+ statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
+
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
+
+ statementSet = sEnv.createStatementSet();
+ statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1
LIMIT 5");
+ statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
assertThat(statementSet.compilePlan().explain()).doesNotContain("Reused");
}