This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 9f358ab1d7c2693d475560dd02d62a75ebe659ef Author: Hongshun Wang <[email protected]> AuthorDate: Tue Aug 20 00:53:02 2024 +0800 [FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration This closes #3556. (cherry picked from commit d3473de4db92229eb09e1e4c698a2db5448c8805) --- .../org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java | 3 ++- .../flink/cdc/composer/flink/translator/DataSourceTranslator.java | 7 ++++--- .../operators/schema/coordinator/SchemaRegistryRequestHandler.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index bddd5fc00..b3f17b2e5 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -97,7 +97,8 @@ public class FlinkPipelineComposer implements PipelineComposer { // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream<Event> stream = - sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); + sourceTranslator.translate( + pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 7b631c6f1..90bbea73e 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; -import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; @@ -41,12 +40,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DataSourceTranslator { public DataStreamSource<Event> translate( - SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { + SourceDef sourceDef, + StreamExecutionEnvironment env, + Configuration pipelineConfig, + int sourceParallelism) { // Create data source DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig); // Get source provider - final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 77360169e..da88753e5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable { private final Set<Integer> flushedSinkWriters; /** Status of the execution of current schema change request. */ - private boolean isSchemaChangeApplying; + private volatile boolean isSchemaChangeApplying; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool;
