This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new d3473de4d [FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying
as volatile for thread safe consideration
d3473de4d is described below
commit d3473de4db92229eb09e1e4c698a2db5448c8805
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Aug 20 00:53:02 2024 +0800
[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile
for thread safe consideration
This closes #3556.
---
.../org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java | 3 ++-
.../flink/cdc/composer/flink/translator/DataSourceTranslator.java | 7 ++++---
.../operators/schema/coordinator/SchemaRegistryRequestHandler.java | 2 +-
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index bddd5fc00..b3f17b2e5 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -97,7 +97,8 @@ public class FlinkPipelineComposer implements
PipelineComposer {
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
- sourceTranslator.translate(pipelineDef.getSource(), env,
pipelineDef.getConfig());
+ sourceTranslator.translate(
+ pipelineDef.getSource(), env, pipelineDef.getConfig(),
parallelism);
// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 7b631c6f1..90bbea73e 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -23,7 +23,6 @@ import
org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
-import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@@ -41,12 +40,14 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataSourceTranslator {
public DataStreamSource<Event> translate(
- SourceDef sourceDef, StreamExecutionEnvironment env, Configuration
pipelineConfig) {
+ SourceDef sourceDef,
+ StreamExecutionEnvironment env,
+ Configuration pipelineConfig,
+ int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env,
pipelineConfig);
// Get source provider
- final int sourceParallelism =
pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
EventSourceProvider eventSourceProvider =
dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 77360169e..da88753e5 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private final Set<Integer> flushedSinkWriters;
/** Status of the execution of current schema change request. */
- private boolean isSchemaChangeApplying;
+ private volatile boolean isSchemaChangeApplying;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;