This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 104209558a30dc12d300bc23009cf59be105671c
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 12 14:10:35 2024 +0800

    [FLINK-36034][cdc-runtime] Get rid of Flink table planner dependency in cdc 
runtime module
    
    This closes #3513.
---
 flink-cdc-runtime/pom.xml                          | 28 --------------
 .../runtime/functions/BuiltInScalarFunction.java   | 43 +---------------------
 .../runtime/operators/schema/SchemaEvolveTest.java |  2 +-
 3 files changed, 3 insertions(+), 70 deletions(-)

diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml
index e278fd669..deb297509 100644
--- a/flink-cdc-runtime/pom.xml
+++ b/flink-cdc-runtime/pom.xml
@@ -44,34 +44,6 @@ limitations under the License.
             <groupId>org.apache.calcite</groupId>
             <artifactId>calcite-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_2.12</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>value</artifactId>
-                    <groupId>org.immutables</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>value-annotations</artifactId>
-                    <groupId>org.immutables</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>commons-compiler</artifactId>
-                    <groupId>org.codehaus.janino</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>janino</artifactId>
-                    <groupId>org.codehaus.janino</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>flink-scala_2.12</artifactId>
-                    <groupId>org.apache.flink</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-common</artifactId>
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
index d1052cf39..80141a13e 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
@@ -18,7 +18,6 @@
 package org.apache.flink.cdc.runtime.functions;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -31,28 +30,17 @@ import org.apache.calcite.sql.validate.SqlMonotonicity;
 
 import javax.annotation.Nullable;
 
-import java.util.Optional;
 import java.util.function.Function;
 
-import static 
org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
-import static 
org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
-import static 
org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 
 /**
  * This is the case when the operator has a special parsing syntax or uses 
other Calcite-specific
- * features that are not exposed via {@link BuiltInFunctionDefinition} yet.
- *
- * <p>Note: Try to keep usages of this class to a minimum and use Flink's 
{@link
- * BuiltInFunctionDefinition} stack instead.
- *
- * <p>For simple functions, use the provided builder. Otherwise, this class 
can also be extended.
+ * features that are not exposed via {@link SqlFunction} yet.
  */
 @Internal
 public class BuiltInScalarFunction extends SqlFunction {
 
-    private final @Nullable Integer version;
-
     private final boolean isDeterministic;
 
     private final boolean isInternal;
@@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction {
 
     protected BuiltInScalarFunction(
             String name,
-            int version,
             SqlKind kind,
             @Nullable SqlReturnTypeInference returnTypeInference,
             @Nullable SqlOperandTypeInference operandTypeInference,
@@ -77,11 +64,9 @@ public class BuiltInScalarFunction extends SqlFunction {
                 operandTypeInference,
                 operandTypeChecker,
                 checkNotNull(category));
-        this.version = isInternal ? null : version;
         this.isDeterministic = isDeterministic;
         this.isInternal = isInternal;
         this.monotonicity = monotonicity;
-        validateFunction(name, version, isInternal);
     }
 
     protected BuiltInScalarFunction(
@@ -93,7 +78,6 @@ public class BuiltInScalarFunction extends SqlFunction {
             SqlFunctionCategory category) {
         this(
                 name,
-                DEFAULT_VERSION,
                 kind,
                 returnTypeInference,
                 operandTypeInference,
@@ -109,18 +93,6 @@ public class BuiltInScalarFunction extends SqlFunction {
         return new Builder();
     }
 
-    public final Optional<Integer> getVersion() {
-        return Optional.ofNullable(version);
-    }
-
-    public String getQualifiedName() {
-        if (isInternal) {
-            return getName();
-        }
-        assert version != null;
-        return qualifyFunctionName(getName(), version);
-    }
-
     @Override
     public boolean isDeterministic() {
         return isDeterministic;
@@ -144,8 +116,6 @@ public class BuiltInScalarFunction extends SqlFunction {
 
         private String name;
 
-        private int version = DEFAULT_VERSION;
-
         private SqlKind kind = SqlKind.OTHER_FUNCTION;
 
         private SqlReturnTypeInference returnTypeInference;
@@ -163,18 +133,11 @@ public class BuiltInScalarFunction extends SqlFunction {
         private Function<SqlOperatorBinding, SqlMonotonicity> monotonicity =
                 call -> SqlMonotonicity.NOT_MONOTONIC;
 
-        /** @see BuiltInFunctionDefinition.Builder#name(String) */
         public Builder name(String name) {
             this.name = name;
             return this;
         }
 
-        /** @see BuiltInFunctionDefinition.Builder#version(int) */
-        public Builder version(int version) {
-            this.version = version;
-            return this;
-        }
-
         public Builder kind(SqlKind kind) {
             this.kind = kind;
             return this;
@@ -205,7 +168,6 @@ public class BuiltInScalarFunction extends SqlFunction {
             return this;
         }
 
-        /** @see BuiltInFunctionDefinition.Builder#internal() */
         public Builder internal() {
             this.isInternal = true;
             return this;
@@ -224,7 +186,6 @@ public class BuiltInScalarFunction extends SqlFunction {
         public BuiltInScalarFunction build() {
             return new BuiltInScalarFunction(
                     name,
-                    version,
                     kind,
                     returnTypeInference,
                     operandTypeInference,
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index 26a6276c0..bf61ad260 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.cdc.runtime.operators.schema;
 
-import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -41,6 +40,7 @@ import 
org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 
 import org.apache.commons.collections.ListUtils;

Reply via email to