yuxiqian commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2002273866


##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##########
@@ -158,8 +176,7 @@ private <CommT> void addCommittingTopology(
                     ((WithPreCommitTopology<Event, CommT>) 
sink).addPreCommitTopology(written);
         }
 
-        // TODO: Hard coding stream mode and checkpoint
-        boolean isBatchMode = false;
+        // TODO: Hard coding checkpoint
         boolean isCheckpointingEnabled = true;

Review Comment:
   Just curious, is it possible to enable checkpointing in batch mode?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java:
##########
@@ -134,6 +135,19 @@ public static Set<Schema> 
reverseLookupDependingUpstreamSchemas(
                 .collect(Collectors.toSet());
     }
 
+    /** For an evolved table ID, reverse lookup all upstream schemas that 
needs to be fit in. */
+    public static Set<Schema> reverseLookupDependingUpstreamSchemas(
+            final TableIdRouter tableIdRouter,
+            final TableId evolvedTableId,
+            final Set<TableId> allOriginalTables,
+            final Schema originalSchema) {
+        return reverseLookupDependingUpstreamTables(
+                        tableIdRouter, evolvedTableId, allOriginalTables)
+                .stream()
+                .map(utid -> originalSchema)
+                .collect(Collectors.toSet());
+    }
+

Review Comment:
   Seems this method will always return `Set{originalSchema}` or `Set{}`. What 
is it used for?



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java:
##########
@@ -46,6 +47,34 @@ public DataStream<Event> translateRegular(
             int downstreamParallelism,
             OperatorID schemaOperatorID,
             HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
+        return translateRegular(
+                input,
+                upstreamParallelism,
+                downstreamParallelism,
+                false,
+                schemaOperatorID,
+                hashFunctionProvider);
+    }
+
+    public DataStream<Event> translateRegular(
+            DataStream<Event> input,
+            int upstreamParallelism,
+            int downstreamParallelism,
+            boolean isBatchMode,
+            OperatorID schemaOperatorID,
+            HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
+        if (isBatchMode) {
+            return input.transform(
+                            "BatchPrePartition",
+                            new PartitioningEventTypeInfo(),
+                            new RegularPrePartitionBatchOperator(
+                                    downstreamParallelism, 
hashFunctionProvider))
+                    .setParallelism(upstreamParallelism)
+                    .partitionCustom(new EventPartitioner(), new 
PartitioningEventKeySelector())
+                    .map(new PostPartitionProcessor(), new EventTypeInfo())
+                    .name("BatchPostPartition")
+                    .setParallelism(downstreamParallelism);
+        }

Review Comment:
   ditto: prefer
   
   ```java
   if (isBatchMode) {
       // batch code
   } else {
       // streaming code
   }
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java:
##########
@@ -59,6 +79,9 @@ public <T extends StreamOperator<CommittableMessage<CommT>>> 
T createStreamOpera
 
     @Override
     public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
+        if (isBatchMode) {
+            return DataBatchSinkWriterOperator.class;
+        }
         return DataSinkWriterOperator.class;

Review Comment:
   nit: wrap streaming case in `else { ... }`



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -111,6 +112,12 @@ private void translate(StreamExecutionEnvironment env, 
PipelineDef pipelineDef)
         SchemaChangeBehavior schemaChangeBehavior =
                 
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
 
+        boolean isBatchMode = false;
+        if 
(pipelineDefConfig.get(PipelineOptions.PIPELINE_BATCH_MODE_ENABLED)) {
+            isBatchMode = true;
+            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        }

Review Comment:
   ```suggestion
           boolean isBatchMode =
               
pipelineDefConfig.get(PipelineOptions.PIPELINE_BATCH_MODE_ENABLED);
           if (isBatchMode) {
               env.setRuntimeMode(RuntimeExecutionMode.BATCH);
           } else {
               env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
           }
   ```
   
   nit: though `STREAMING` is the default option for now, manually specifying 
it would be clearer.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java:
##########
@@ -341,4 +355,57 @@ public Optional<DataChangeEvent> coerceDataRecord(
 
         return Optional.of(dataChangeEvent);
     }
+
+    /** Deduce merged CreateTableEvent in batch mode. */

Review Comment:
   ```suggestion
       /** Deduce initial downstream schemas based on upstream tables and 
routing rules. */
   ```
   
   
   
   
   



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperatorFactory.java:
##########
@@ -36,17 +36,37 @@ public class DataSinkWriterOperatorFactory<CommT>
                 YieldingOperatorFactory<CommittableMessage<CommT>> {
 
     private final Sink<Event> sink;
+    private final boolean isBatchMode;
     private final OperatorID schemaOperatorID;
 
     public DataSinkWriterOperatorFactory(Sink<Event> sink, OperatorID 
schemaOperatorID) {
         this.sink = sink;
+        this.isBatchMode = false;
+        this.schemaOperatorID = schemaOperatorID;
+    }

Review Comment:
   Better remove this constructor and specify streaming / batch mode explicitly



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java:
##########
@@ -341,4 +355,57 @@ public Optional<DataChangeEvent> coerceDataRecord(
 
         return Optional.of(dataChangeEvent);
     }
+
+    /** Deduce merged CreateTableEvent in batch mode. */
+    public static List<CreateTableEvent> 
deduceMergedCreateTableEventInBatchMode(

Review Comment:
   ```suggestion
       public static List<CreateTableEvent> deduceInitialCreateTableEvents(
   ```
   
   Removed `inBatchMode`, since it might be helpful in streaming mode, too.



##########
flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml:
##########
@@ -58,6 +58,7 @@ pipeline:
   parallelism: 4
   schema.change.behavior: evolve
   schema-operator.rpc-timeout: 1 h
+  batch-mode.enabled: false

Review Comment:
   minor: since batchMode is disabled by default, maybe we can turn it on here 
to verify if it could be enabled correctly?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java:
##########
@@ -76,7 +77,7 @@ public List<TableId> route(TableId sourceTableId) {
         return routingCache.getUnchecked(sourceTableId);
     }
 
-    private List<TableId> calculateRoute(TableId sourceTableId) {
+    public List<TableId> calculateRoute(TableId sourceTableId) {

Review Comment:
   Don't modify this, it's an internal method. Call `TableIdRouter#route` 
externally so the results could be cached.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##########
@@ -45,6 +45,12 @@ public class PipelineOptions {
                     .defaultValue(1)
                     .withDescription("Parallelism of the pipeline");
 
+    public static final ConfigOption<Boolean> PIPELINE_BATCH_MODE_ENABLED =
+            ConfigOptions.key("batch-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enabled batch mode of the pipeline");

Review Comment:
   ```suggestion
                       .withDescription("Run pipeline job in batch mode instead 
of streaming mode");
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java:
##########
@@ -95,4 +96,23 @@ private TableId resolveReplacement(
         }
         return TableId.parse(route.f1);
     }
+
+    public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> 
tableIdSet) {

Review Comment:
   I doubt if it's really a generic and reusable method, without corresponding 
JavaDocs and test cases. Maybe just write it as a `for` loop in 
`SchemaDerivator`?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java:
##########
@@ -341,4 +355,57 @@ public Optional<DataChangeEvent> coerceDataRecord(
 
         return Optional.of(dataChangeEvent);
     }
+
+    /** Deduce merged CreateTableEvent in batch mode. */
+    public static List<CreateTableEvent> 
deduceMergedCreateTableEventInBatchMode(
+            TableIdRouter router, List<CreateTableEvent> createTableEvents) {
+        Set<TableId> originalTables =
+                createTableEvents.stream()
+                        .map(CreateTableEvent::tableId)
+                        .collect(Collectors.toSet());
+
+        List<Set<TableId>> sourceTablesByRouteRule =
+                router.groupSourceTablesByRouteRule(originalTables);
+        Map<TableId, Schema> sourceTableIdToSchemaMap =
+                createTableEvents.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        CreateTableEvent::tableId, 
CreateTableEvent::getSchema));
+        Map<TableId, Schema> sinkTableIdToSchemaMap = new HashMap<>();
+        Set<TableId> routedTables = new HashSet<>();
+        for (Set<TableId> sourceTables : sourceTablesByRouteRule) {
+            List<Schema> toBeMergedSchemas = new ArrayList<>();
+            for (TableId tableId : sourceTables) {
+                toBeMergedSchemas.add(sourceTableIdToSchemaMap.get(tableId));
+                routedTables.add(tableId);
+            }
+            if (toBeMergedSchemas.isEmpty()) {
+                continue;
+            }
+            Schema mergedSchema = null;
+            for (Schema toBeMergedSchema : toBeMergedSchemas) {
+                if (mergedSchema == null) {
+                    mergedSchema = toBeMergedSchema;
+                    continue;
+                }
+                mergedSchema =
+                        SchemaMergingUtils.getLeastCommonSchema(mergedSchema, 
toBeMergedSchema);
+            }

Review Comment:
   call `SchemaUtils#getCommonSchema`



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreBatchTransformOperator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A data process function that filters out columns which aren't (directly & 
indirectly) referenced
+ * in batch mode. There is no need to consider the task failover scenario in 
batch mode, as it
+ * should be re - emitted from the source.
+ */
+public class PreBatchTransformOperator extends AbstractStreamOperator<Event>

Review Comment:
   "PreTransform" is a phrase. Call it `BatchPreTransformOperator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to