This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 9f358ab1d7c2693d475560dd02d62a75ebe659ef
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Aug 20 00:53:02 2024 +0800

    [FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile 
for thread safe consideration
    
    This closes #3556.
    
    (cherry picked from commit d3473de4db92229eb09e1e4c698a2db5448c8805)
---
 .../org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java | 3 ++-
 .../flink/cdc/composer/flink/translator/DataSourceTranslator.java  | 7 ++++---
 .../operators/schema/coordinator/SchemaRegistryRequestHandler.java | 2 +-
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index bddd5fc00..b3f17b2e5 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -97,7 +97,8 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
         // Build Source Operator
         DataSourceTranslator sourceTranslator = new DataSourceTranslator();
         DataStream<Event> stream =
-                sourceTranslator.translate(pipelineDef.getSource(), env, 
pipelineDef.getConfig());
+                sourceTranslator.translate(
+                        pipelineDef.getSource(), env, pipelineDef.getConfig(), 
parallelism);
 
         // Build PreTransformOperator for processing Schema Event
         TransformTranslator transformTranslator = new TransformTranslator();
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 7b631c6f1..90bbea73e 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.factories.DataSourceFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
-import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.common.source.EventSourceProvider;
 import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@@ -41,12 +40,14 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 public class DataSourceTranslator {
 
     public DataStreamSource<Event> translate(
-            SourceDef sourceDef, StreamExecutionEnvironment env, Configuration 
pipelineConfig) {
+            SourceDef sourceDef,
+            StreamExecutionEnvironment env,
+            Configuration pipelineConfig,
+            int sourceParallelism) {
         // Create data source
         DataSource dataSource = createDataSource(sourceDef, env, 
pipelineConfig);
 
         // Get source provider
-        final int sourceParallelism = 
pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
         EventSourceProvider eventSourceProvider = 
dataSource.getEventSourceProvider();
         if (eventSourceProvider instanceof FlinkSourceProvider) {
             // Source
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 77360169e..da88753e5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements 
Closeable {
     private final Set<Integer> flushedSinkWriters;
 
     /** Status of the execution of current schema change request. */
-    private boolean isSchemaChangeApplying;
+    private volatile boolean isSchemaChangeApplying;
     /** Executor service to execute schema change. */
     private final ExecutorService schemaChangeThreadPool;
 

Reply via email to