JingsongLi commented on code in PR #1261:
URL: https://github.com/apache/incubator-paimon/pull/1261#discussion_r1222697554
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java:
##########
@@ -54,6 +60,16 @@ default String parseTableName() {
*/
List<CdcRecord> parseRecords();
+ /**
+ * Parse newly added table schema from event.
+ *
+ * @param databaseName
Review Comment:
remove this line if you have no comment
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java:
##########
@@ -42,32 +50,103 @@
public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T,
Void> {
Review Comment:
Can you separate functions for static and dynamic mode?
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java:
##########
@@ -42,32 +50,103 @@
public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T,
Void> {
private final EventParser.Factory<T> parserFactory;
+ private final Set<String> initialTables;
+ private final String database;
+ private final Catalog.Loader catalogLoader;
+ private final MySqlDatabaseSyncMode mode;
private transient EventParser<T> parser;
+ private transient Catalog catalog;
private transient Map<String, OutputTag<List<DataField>>>
updatedDataFieldsOutputTags;
private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
+ public static final OutputTag<CdcMultiplexRecord> DYNAMIC_OUTPUT_TAG =
+ new OutputTag<>("paimon-dynamic-table",
TypeInformation.of(CdcMultiplexRecord.class));
+ public static final OutputTag<Tuple2<Identifier, List<DataField>>>
+ DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG =
+ new OutputTag<>(
+ "paimon-dynamic-table-schema-change",
+ TypeInformation.of(
+ new TypeHint<Tuple2<Identifier,
List<DataField>>>() {}));
- public CdcMultiTableParsingProcessFunction(EventParser.Factory<T>
parserFactory) {
+ public CdcMultiTableParsingProcessFunction(
+ String database,
+ Catalog.Loader catalogLoader,
+ List<FileStoreTable> tables,
+ EventParser.Factory<T> parserFactory) {
+ this(database, catalogLoader, tables, parserFactory,
MySqlDatabaseSyncMode.STATIC);
+ }
+
+ public CdcMultiTableParsingProcessFunction(
+ String database,
+ Catalog.Loader catalogLoader,
+ List<FileStoreTable> tables,
+ EventParser.Factory<T> parserFactory,
+ MySqlDatabaseSyncMode mode) {
+ // for now, only support single database
+ this.database = database;
+ this.catalogLoader = catalogLoader;
+ this.initialTables =
tables.stream().map(FileStoreTable::name).collect(Collectors.toSet());
this.parserFactory = parserFactory;
+ this.mode = mode;
}
@Override
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
updatedDataFieldsOutputTags = new HashMap<>();
recordOutputTags = new HashMap<>();
+ catalog = catalogLoader.load();
}
@Override
public void processElement(T raw, Context context, Collector<Void>
collector) throws Exception {
parser.setRawEvent(raw);
+
+ // CDC Ingestion only supports single database at this time being.
+ // In the future, there will be a mapping between source databases
+ // and target paimon databases
+ String databaseName = parser.parseDatabaseName();
Review Comment:
`parseDatabaseName` is no using?
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java:
##########
@@ -84,10 +99,54 @@ public void build() {
Preconditions.checkNotNull(parserFactory);
StreamExecutionEnvironment env = input.getExecutionEnvironment();
+ if (mode == MySqlDatabaseSyncMode.DYNAMIC) {
+ buildDynamicCdcSink(env);
+ } else {
+ buildStaticCdcSink(env);
+ }
+ }
+
+ private void buildDynamicCdcSink(StreamExecutionEnvironment env) {
+ SingleOutputStreamOperator<Void> parsed =
+ input.forward()
+ .process(
+ new CdcMultiTableParsingProcessFunction<>(
+ database, catalogLoader, tables,
parserFactory, mode))
+ .setParallelism(input.getParallelism());
+
+ // for newly-added tables, create a multiplexing operator that handles
all their records
+ // and writes to multiple tables
+ DataStream<CdcMultiplexRecord> newlyAddedTableStream =
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
CdcMultiTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG);
+ // handles schema change for newly added tables
+ SingleOutputStreamOperator<Void> schemaChangeProcessFunction =
Review Comment:
schemaChangeProcessFunction no use
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]