JingsongLi commented on code in PR #1261:
URL: https://github.com/apache/incubator-paimon/pull/1261#discussion_r1236217464
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java:
##########
@@ -85,55 +99,94 @@ public void build() {
Preconditions.checkNotNull(parserFactory);
StreamExecutionEnvironment env = input.getExecutionEnvironment();
+ if (mode == MySqlDatabaseSyncMode.DYNAMIC) {
+ buildDynamicCdcSink(env);
+ } else {
+ buildStaticCdcSink(env);
+ }
+ }
+ private void buildDynamicCdcSink(StreamExecutionEnvironment env) {
+ SingleOutputStreamOperator<Void> parsed =
+ input.forward()
+ .process(
+ new CdcDynamicTableParsingProcessFunction<>(
+ database, catalogLoader, tables,
parserFactory, mode))
+ .setParallelism(input.getParallelism());
+
+ // for newly-added tables, create a multiplexing operator that handles
all their records
+ // and writes to multiple tables
+ DataStream<CdcMultiplexRecord> newlyAddedTableStream =
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG);
+ // handles schema change for newly added tables
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
+
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
+ .process(new
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));
+
+ FlinkStreamPartitioner<CdcMultiplexRecord> partitioner =
+ new FlinkStreamPartitioner<>(new
CdcMultiplexRecordChannelComputer(catalogLoader));
+ PartitionTransformation<CdcMultiplexRecord> partitioned =
Review Comment:
I mean what does it do? I see it even has bugs in it, and I'm pretty sure no
test can cover it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]