This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38729-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 761e1a22f6ef16fc077bf026a77c6b6fed49b476
Author: lvyanquan <[email protected]>
AuthorDate: Mon Mar 2 19:06:00 2026 +0800

    [FLINK-38729] Add support for Flink 2.2.0
---
 .../cdc/runtime/operators/sink/DataSinkWriterOperator.java    |  1 +
 .../runtime/partitioning/DistributedPrePartitionOperator.java | 11 ++---------
 .../cdc/runtime/partitioning/RegularPrePartitionOperator.java | 11 ++---------
 .../flink/cdc/runtime/utils/FlinkCompatibilityUtils.java      |  2 +-
 4 files changed, 6 insertions(+), 19 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index bdde1116c..eac359dd6 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
index 81debd40c..8b8fcf322 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.cdc.runtime.utils.FlinkCompatibilityUtils;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -55,15 +56,7 @@ public class DistributedPrePartitionOperator extends 
AbstractStreamOperator<Part
 
     public DistributedPrePartitionOperator(
             int downstreamParallelism, HashFunctionProvider<DataChangeEvent> 
hashFunctionProvider) {
-        // Try to set chainingStrategy via reflection for backward 
compatibility with Flink 1.x
-        try {
-            java.lang.reflect.Field field =
-                    
AbstractStreamOperator.class.getDeclaredField("chainingStrategy");
-            field.setAccessible(true);
-            field.set(this, ChainingStrategy.ALWAYS);
-        } catch (Exception e) {
-            // Ignore if chainingStrategy doesn't exist in Flink 2.x
-        }
+        FlinkCompatibilityUtils.setChainingStrategyIfAvailable(this, 
ChainingStrategy.ALWAYS);
         this.downstreamParallelism = downstreamParallelism;
         this.hashFunctionProvider = hashFunctionProvider;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
index b18b586d1..d65e3d755 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/RegularPrePartitionOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
 import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.cdc.runtime.utils.FlinkCompatibilityUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -67,15 +68,7 @@ public class RegularPrePartitionOperator extends 
AbstractStreamOperator<Partitio
             OperatorID schemaOperatorId,
             int downstreamParallelism,
             HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
-        // Try to set chainingStrategy via reflection for backward 
compatibility with Flink 1.x
-        try {
-            java.lang.reflect.Field field =
-                    
AbstractStreamOperator.class.getDeclaredField("chainingStrategy");
-            field.setAccessible(true);
-            field.set(this, ChainingStrategy.ALWAYS);
-        } catch (Exception e) {
-            // Ignore if chainingStrategy doesn't exist in Flink 2.x
-        }
+        FlinkCompatibilityUtils.setChainingStrategyIfAvailable(this, 
ChainingStrategy.ALWAYS);
         this.schemaOperatorId = schemaOperatorId;
         this.downstreamParallelism = downstreamParallelism;
         this.hashFunctionProvider = hashFunctionProvider;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java
index c4b1cfa4a..cf0d24386 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/utils/FlinkCompatibilityUtils.java
@@ -18,8 +18,8 @@
 package org.apache.flink.cdc.runtime.utils;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.runtime.operators.AbstractStreamOperator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Reply via email to