JingsongLi commented on code in PR #1261:
URL: https://github.com/apache/incubator-paimon/pull/1261#discussion_r1236217464


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java:
##########
@@ -85,55 +99,94 @@ public void build() {
         Preconditions.checkNotNull(parserFactory);
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
+        if (mode == MySqlDatabaseSyncMode.DYNAMIC) {
+            buildDynamicCdcSink(env);
+        } else {
+            buildStaticCdcSink(env);
+        }
+    }
 
+    private void buildDynamicCdcSink(StreamExecutionEnvironment env) {
+        SingleOutputStreamOperator<Void> parsed =
+                input.forward()
+                        .process(
+                                new CdcDynamicTableParsingProcessFunction<>(
+                                        database, catalogLoader, tables, 
parserFactory, mode))
+                        .setParallelism(input.getParallelism());
+
+        // for newly-added tables, create a multiplexing operator that handles 
all their records
+        //     and writes to multiple tables
+        DataStream<CdcMultiplexRecord> newlyAddedTableStream =
+                SingleOutputStreamOperatorUtils.getSideOutput(
+                        parsed, 
CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG);
+        // handles schema change for newly added tables
+        SingleOutputStreamOperatorUtils.getSideOutput(
+                        parsed,
+                        
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
+                .process(new 
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));
+
+        FlinkStreamPartitioner<CdcMultiplexRecord> partitioner =
+                new FlinkStreamPartitioner<>(new 
CdcMultiplexRecordChannelComputer(catalogLoader));
+        PartitionTransformation<CdcMultiplexRecord> partitioned =

Review Comment:
   I mean what does it do? I see it even has bugs in it, and I'm pretty sure no 
test can cover it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to