lvyanquan commented on code in PR #3801:
URL: https://github.com/apache/flink-cdc/pull/3801#discussion_r1891093486
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -172,6 +184,12 @@ public void setup(
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
+ if (canContainDistributedTables) {
+ // In distributed mode, we don't have a globally consistent schema
for each partition.
+ // It's not meaningful to persist them to state. Instead, we rely
on each source
+ // partition to send fresh CreateTableEvent to instantiate each
event flow.
Review Comment:
Considering that we do not provided any guarantee for this behavior on the
API level, should we add some meaningful logs or exception messages instead of
NoPointException in `processDataChangeEvent` method when the Source does not
comply with this behavior.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.partitioning;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Operator for processing events from {@link SchemaOperator} before {@link
EventPartitioner} with
+ * distributed topology.
+ */
+@Internal
+public class DistributedPrePartitionOperator extends
AbstractStreamOperator<PartitioningEvent>
+ implements OneInputStreamOperator<Event, PartitioningEvent>,
Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int downstreamParallelism;
+ private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
+
+ // Schema and HashFunctionMap used in schema inferencing mode.
+ private transient Map<TableId, Schema> schemaMap;
+ private transient Map<TableId, HashFunction<DataChangeEvent>>
hashFunctionMap;
+
+ private transient int subTaskId;
+
+ public DistributedPrePartitionOperator(
+ int downstreamParallelism, HashFunctionProvider<DataChangeEvent>
hashFunctionProvider) {
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.downstreamParallelism = downstreamParallelism;
+ this.hashFunctionProvider = hashFunctionProvider;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ schemaMap = new HashMap<>();
+ hashFunctionMap = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Event> element) throws Exception {
+ Event event = element.getValue();
+ if (event instanceof SchemaChangeEvent) {
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+ TableId tableId = schemaChangeEvent.tableId();
+
+ // Update schema map
+ schemaMap.compute(
+ tableId,
+ (tId, oldSchema) ->
+ SchemaUtils.applySchemaChangeEvent(oldSchema,
schemaChangeEvent));
+
+ // For malformed dangling dropTableEvents, we simply ignore this
event to avoid breaking
+ // the pipeline.
Review Comment:
I don't understand why this situation happened. Partitioning is before the
Schema Operator, and the Source will resend the CreateTableEvent.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -94,76 +97,124 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism =
pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
+ translate(env, pipelineDef);
+
+ // Add framework JARs
+ addFrameworkJars();
+
+ return new FlinkPipelineExecution(
+ env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME),
isBlocking);
+ }
+
+ private void translate(StreamExecutionEnvironment env, PipelineDef
pipelineDef) {
+ Configuration pipelineDefConfig = pipelineDef.getConfig();
+ int parallelism =
pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
SchemaChangeBehavior schemaChangeBehavior =
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
- // Build Source Operator
+ // Initialize translators
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
- DataStream<Event> stream =
- sourceTranslator.translate(
- pipelineDef.getSource(), env, pipelineDefConfig,
parallelism);
-
- // Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
- stream =
- transformTranslator.translatePreTransform(
- stream,
- pipelineDef.getTransforms(),
- pipelineDef.getUdfs(),
- pipelineDef.getModels());
-
- // Schema operator
+ PartitioningTranslator partitioningTranslator = new
PartitioningTranslator();
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
+ DistributedSchemaOperatorTranslator
distributedSchemaOperatorTranslator =
+ new DistributedSchemaOperatorTranslator(
+ schemaChangeBehavior,
+
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
+
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
+
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
+ DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+
+ // And required constructors
OperatorIDGenerator schemaOperatorIDGenerator =
new
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
-
- // Build PostTransformOperator for processing Data Event
- stream =
- transformTranslator.translatePostTransform(
- stream,
- pipelineDef.getTransforms(),
-
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
- pipelineDef.getUdfs(),
- pipelineDef.getModels());
-
- // Build DataSink in advance as schema operator requires
MetadataApplier
- DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+ DataSource dataSource =
+ sourceTranslator.createDataSource(pipelineDef.getSource(),
pipelineDefConfig, env);
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(),
pipelineDefConfig, env);
+ boolean canContainDistributedTables =
dataSource.canContainDistributedTables();
+
+ // O ---> Source
+ DataStream<Event> stream =
+ sourceTranslator.translate(
+ pipelineDef.getSource(), dataSource, env,
pipelineDefConfig, parallelism);
+
+ // Source ---> PreTransform
stream =
- schemaOperatorTranslator.translate(
+ transformTranslator.translatePreTransform(
stream,
- parallelism,
- dataSink.getMetadataApplier()
- .setAcceptedSchemaEvolutionTypes(
-
pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),
- pipelineDef.getRoute());
+ pipelineDef.getTransforms(),
+ pipelineDef.getUdfs(),
+ pipelineDef.getModels(),
+ canContainDistributedTables);
- // Build Partitioner used to shuffle Event
- PartitioningTranslator partitioningTranslator = new
PartitioningTranslator();
+ // PreTransform ---> PostTransform
stream =
- partitioningTranslator.translate(
+ transformTranslator.translatePostTransform(
stream,
- parallelism,
- parallelism,
- schemaOperatorIDGenerator.generate(),
-
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
-
- // Build Sink Operator
- sinkTranslator.translate(
- pipelineDef.getSink(), stream, dataSink,
schemaOperatorIDGenerator.generate());
-
- // Add framework JARs
- addFrameworkJars();
+ pipelineDef.getTransforms(),
+
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
+ pipelineDef.getUdfs(),
+ pipelineDef.getModels());
- return new FlinkPipelineExecution(
- env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME),
isBlocking);
+ if (canContainDistributedTables) {
+ // Translate a distributed topology for sources with distributed
tables
+ // PostTransform -> Partitioning
+ DataStream<PartitioningEvent> partitionedStream =
+ partitioningTranslator.translateDistributed(
+ stream,
+ parallelism,
+ parallelism,
+
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
+
+ // Partitioning -> Schema Operator
+ stream =
+ distributedSchemaOperatorTranslator.translate(
+ partitionedStream,
+ parallelism,
+ dataSink.getMetadataApplier()
+ .setAcceptedSchemaEvolutionTypes(
+ pipelineDef
+ .getSink()
+
.getIncludedSchemaEvolutionTypes()),
+ pipelineDef.getRoute());
+
+ // Schema Operator -> Sink
+ sinkTranslator.translate(
+ pipelineDef.getSink(), stream, dataSink,
schemaOperatorIDGenerator.generate());
+ } else {
+ // Translate a regular topology for sources without distributed
tables
+ // PostTransform ---> Schema Operator
+ stream =
+ schemaOperatorTranslator.translate(
+ stream,
+ parallelism,
+ dataSink.getMetadataApplier()
+ .setAcceptedSchemaEvolutionTypes(
+ pipelineDef
+ .getSink()
+
.getIncludedSchemaEvolutionTypes()),
+ pipelineDef.getRoute());
+
+ // Schema Operator ---(shuffled)---> Partitioning
+ stream =
+ partitioningTranslator.translateRegular(
+ stream,
+ parallelism,
+ parallelism,
+ schemaOperatorIDGenerator.generate(),
+
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
+
+ // Partitioning ---> Sink ---> X
+ sinkTranslator.translate(
+ pipelineDef.getSink(), stream, dataSink,
schemaOperatorIDGenerator.generate());
Review Comment:
I think that the main difference of these two branches is the order of
Partitioning( and types) and Schema Operator?
We can move sinkTranslator.translate out to highlight this.
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -94,76 +97,124 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism =
pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
+ translate(env, pipelineDef);
+
+ // Add framework JARs
+ addFrameworkJars();
+
+ return new FlinkPipelineExecution(
+ env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME),
isBlocking);
+ }
+
+ private void translate(StreamExecutionEnvironment env, PipelineDef
pipelineDef) {
+ Configuration pipelineDefConfig = pipelineDef.getConfig();
+ int parallelism =
pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
SchemaChangeBehavior schemaChangeBehavior =
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
- // Build Source Operator
+ // Initialize translators
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
- DataStream<Event> stream =
- sourceTranslator.translate(
- pipelineDef.getSource(), env, pipelineDefConfig,
parallelism);
-
- // Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
- stream =
- transformTranslator.translatePreTransform(
- stream,
- pipelineDef.getTransforms(),
- pipelineDef.getUdfs(),
- pipelineDef.getModels());
-
- // Schema operator
+ PartitioningTranslator partitioningTranslator = new
PartitioningTranslator();
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
+ DistributedSchemaOperatorTranslator
distributedSchemaOperatorTranslator =
+ new DistributedSchemaOperatorTranslator(
Review Comment:
Is it necessary to add another OperatorTranslator? can we pass
`canContainDistributedTables` to the constructor method of
`SchemaOperatorTranslator` to distinguish these two behaviors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]