This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38729-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 761e1a22f6ef16fc077bf026a77c6b6fed49b476 Author: lvyanquan <[email protected]> AuthorDate: Mon Mar 2 19:06:00 2026 +0800 [FLINK-38729] Add support for Flink 2.2.0 --- .../cdc/runtime/operators/sink/DataSinkWriterOperator.java | 1 + .../runtime/partitioning/DistributedPrePartitionOperator.java | 11 ++--------- .../cdc/runtime/partitioning/RegularPrePartitionOperator.java | 11 ++--------- .../flink/cdc/runtime/utils/FlinkCompatibilityUtils.java | 2 +- 4 files changed, 6 insertions(+), 19 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index bdde1116c..eac359dd6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.util.HashSet; import java.util.Optional; import java.util.Set; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java index 81debd40c..8b8fcf322 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.cdc.runtime.utils.FlinkCompatibilityUtils; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -55,15 +56,7 @@ public class DistributedPrePartitionOperator extends AbstractStreamOperator<Part public DistributedPrePartitionOperator( int downstreamParallelism, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) { - // Try to set chainingStrategy via reflection for backward compatibility with Flink 1.x - try { - java.lang.reflect.Field field = - AbstractStreamOperator.class.getDeclaredField("chainingStrategy"); - field.setAccessible(true); - field.set(this, ChainingStrategy.ALWAYS); - } catch (Exception e) { - // Ignore if chainingStrategy doesn't exist in Flink 2.x - } + FlinkCompatibilityUtils.setChainingStrategyIfAvailable(this, ChainingStrategy.ALWAYS); this.downstreamParallelism = downstreamParallelism; this.hashFunctionProvider = hashFunctionProvider; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java index b18b586d1..d65e3d755 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; +import org.apache.flink.cdc.runtime.utils.FlinkCompatibilityUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -67,15 +68,7 @@ public class RegularPrePartitionOperator extends AbstractStreamOperator<Partitio OperatorID schemaOperatorId, int downstreamParallelism, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) { - // Try to set chainingStrategy via reflection for backward compatibility with Flink 1.x - try { - java.lang.reflect.Field field = - AbstractStreamOperator.class.getDeclaredField("chainingStrategy"); - field.setAccessible(true); - field.set(this, ChainingStrategy.ALWAYS); - } catch (Exception e) { - // Ignore if chainingStrategy doesn't exist in Flink 2.x - } + FlinkCompatibilityUtils.setChainingStrategyIfAvailable(this, ChainingStrategy.ALWAYS); this.schemaOperatorId = schemaOperatorId; this.downstreamParallelism = downstreamParallelism; this.hashFunctionProvider = hashFunctionProvider; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java index c4b1cfa4a..cf0d24386 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java @@ -18,8 +18,8 @@ package org.apache.flink.cdc.runtime.utils; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.runtime.operators.AbstractStreamOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
