This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 104209558a30dc12d300bc23009cf59be105671c Author: yuxiqian <[email protected]> AuthorDate: Mon Aug 12 14:10:35 2024 +0800 [FLINK-36034][cdc-runtime] Get rid of Flink table planner dependency in cdc runtime module This closes #3513. --- flink-cdc-runtime/pom.xml | 28 -------------- .../runtime/functions/BuiltInScalarFunction.java | 43 +--------------------- .../runtime/operators/schema/SchemaEvolveTest.java | 2 +- 3 files changed, 3 insertions(+), 70 deletions(-) diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml index e278fd669..deb297509 100644 --- a/flink-cdc-runtime/pom.xml +++ b/flink-cdc-runtime/pom.xml @@ -44,34 +44,6 @@ limitations under the License. <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_2.12</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <artifactId>value</artifactId> - <groupId>org.immutables</groupId> - </exclusion> - <exclusion> - <artifactId>value-annotations</artifactId> - <groupId>org.immutables</groupId> - </exclusion> - <exclusion> - <artifactId>commons-compiler</artifactId> - <groupId>org.codehaus.janino</groupId> - </exclusion> - <exclusion> - <artifactId>janino</artifactId> - <groupId>org.codehaus.janino</groupId> - </exclusion> - <exclusion> - <artifactId>flink-scala_2.12</artifactId> - <groupId>org.apache.flink</groupId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-common</artifactId> diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java index d1052cf39..80141a13e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java @@ -18,7 +18,6 @@ package org.apache.flink.cdc.runtime.functions; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; @@ -31,28 +30,17 @@ import org.apache.calcite.sql.validate.SqlMonotonicity; import javax.annotation.Nullable; -import java.util.Optional; import java.util.function.Function; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; /** * This is the case when the operator has a special parsing syntax or uses other Calcite-specific - * features that are not exposed via {@link BuiltInFunctionDefinition} yet. - * - * <p>Note: Try to keep usages of this class to a minimum and use Flink's {@link - * BuiltInFunctionDefinition} stack instead. - * - * <p>For simple functions, use the provided builder. Otherwise, this class can also be extended. + * features that are not exposed via {@link SqlFunction} yet. */ @Internal public class BuiltInScalarFunction extends SqlFunction { - private final @Nullable Integer version; - private final boolean isDeterministic; private final boolean isInternal; @@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction { protected BuiltInScalarFunction( String name, - int version, SqlKind kind, @Nullable SqlReturnTypeInference returnTypeInference, @Nullable SqlOperandTypeInference operandTypeInference, @@ -77,11 +64,9 @@ public class BuiltInScalarFunction extends SqlFunction { operandTypeInference, operandTypeChecker, checkNotNull(category)); - this.version = isInternal ? null : version; this.isDeterministic = isDeterministic; this.isInternal = isInternal; this.monotonicity = monotonicity; - validateFunction(name, version, isInternal); } protected BuiltInScalarFunction( @@ -93,7 +78,6 @@ public class BuiltInScalarFunction extends SqlFunction { SqlFunctionCategory category) { this( name, - DEFAULT_VERSION, kind, returnTypeInference, operandTypeInference, @@ -109,18 +93,6 @@ public class BuiltInScalarFunction extends SqlFunction { return new Builder(); } - public final Optional<Integer> getVersion() { - return Optional.ofNullable(version); - } - - public String getQualifiedName() { - if (isInternal) { - return getName(); - } - assert version != null; - return qualifyFunctionName(getName(), version); - } - @Override public boolean isDeterministic() { return isDeterministic; @@ -144,8 +116,6 @@ public class BuiltInScalarFunction extends SqlFunction { private String name; - private int version = DEFAULT_VERSION; - private SqlKind kind = SqlKind.OTHER_FUNCTION; private SqlReturnTypeInference returnTypeInference; @@ -163,18 +133,11 @@ public class BuiltInScalarFunction extends SqlFunction { private Function<SqlOperatorBinding, SqlMonotonicity> monotonicity = call -> SqlMonotonicity.NOT_MONOTONIC; - /** @see BuiltInFunctionDefinition.Builder#name(String) */ public Builder name(String name) { this.name = name; return this; } - /** @see BuiltInFunctionDefinition.Builder#version(int) */ - public Builder version(int version) { - this.version = version; - return this; - } - public Builder kind(SqlKind kind) { this.kind = kind; return this; @@ -205,7 +168,6 @@ public class BuiltInScalarFunction extends SqlFunction { return this; } - /** @see BuiltInFunctionDefinition.Builder#internal() */ public Builder internal() { this.isInternal = true; return this; @@ -224,7 +186,6 @@ public class BuiltInScalarFunction extends SqlFunction { public BuiltInScalarFunction build() { return new BuiltInScalarFunction( name, - version, kind, returnTypeInference, operandTypeInference, diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 26a6276c0..bf61ad260 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; @@ -41,6 +40,7 @@ import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; import org.apache.commons.collections.ListUtils;
