yuxiqian commented on code in PR #3812:
URL: https://github.com/apache/flink-cdc/pull/3812#discussion_r2017811453
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -90,7 +94,16 @@ public MySqlPipelineRecordEmitter(
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
- if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ if
(StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) {
+ // In snapshot mode, we simply emit all schemas at once.
+ if (!alreadySendAllCreateTableTables) {
Review Comment:
In snapshot mode, we will:
* obtain and check startup mode, and check the flag
In any other modes, we will:
* obtain and check startup mode
I would suggest naming `alreadySendAllCreateTableTables` =>
`shouldEmitAllCtesInSnapshotMode`, and set it to `true` in snapshot mode,
`false` (in other modes), and the checking could be simplified to:
```java
if (shouldEmitAllCtesInSnapshotMode) {
createTableEventCache.forEach(
(tableId, createTableEvent) -> output.collect(createTableEvent)
);
shouldEmitAllCtesInSnapshotMode = false;
}
```
so we don't have to check the startup mode every time when we receive a
SourceRecord.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaBatchOperator.java:
##########
Review Comment:
minor: use consistent names for new operator classes: either
`BatchXXXOperator` (like `BatchPreTransformOp`) or `XXXBatchOperator`
(`SchemaBatchOperator`).
Personally I prefer the former one since it is easy to distinguish it from
normal Streaming operators.
##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java:
##########
Review Comment:
I think TransformE2e and UdfE2e has nothing to do with batch mode. Shall we
refactor original cases with `@ParameterizedTest`, or just remove these cases
to avoid code inflation and longer CI execution time?
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java:
##########
@@ -46,15 +47,44 @@ public DataStream<Event> translateRegular(
int downstreamParallelism,
OperatorID schemaOperatorID,
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
- return input.transform(
- "PrePartition",
- new PartitioningEventTypeInfo(),
- new RegularPrePartitionOperator(
- schemaOperatorID, downstreamParallelism,
hashFunctionProvider))
- .setParallelism(upstreamParallelism)
- .partitionCustom(new EventPartitioner(), new
PartitioningEventKeySelector())
- .map(new PostPartitionProcessor(), new EventTypeInfo())
- .name("PostPartition");
+ return translateRegular(
+ input,
+ upstreamParallelism,
+ downstreamParallelism,
+ false,
+ schemaOperatorID,
+ hashFunctionProvider);
+ }
+
+ public DataStream<Event> translateRegular(
+ DataStream<Event> input,
+ int upstreamParallelism,
+ int downstreamParallelism,
+ boolean isBatchMode,
+ OperatorID schemaOperatorID,
+ HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
+ if (isBatchMode) {
+ return input.transform(
+ "BatchPrePartition",
+ new PartitioningEventTypeInfo(),
+ new RegularPrePartitionBatchOperator(
+ downstreamParallelism,
hashFunctionProvider))
+ .setParallelism(upstreamParallelism)
+ .partitionCustom(new EventPartitioner(), new
PartitioningEventKeySelector())
+ .map(new PostPartitionProcessor(), new EventTypeInfo())
+ .name("BatchPostPartition")
+ .setParallelism(downstreamParallelism);
+ } else {
+ return input.transform(
+ "PrePartition",
+ new PartitioningEventTypeInfo(),
+ new RegularPrePartitionOperator(
+ schemaOperatorID, downstreamParallelism,
hashFunctionProvider))
+ .setParallelism(upstreamParallelism)
+ .partitionCustom(new EventPartitioner(), new
PartitioningEventKeySelector())
+ .map(new PostPartitionProcessor(), new EventTypeInfo())
+ .name("PostPartition");
Review Comment:
nit: operations after PrePartition seems identical and could be extracted
from the if-else block
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java:
##########
@@ -341,4 +355,57 @@ public Optional<DataChangeEvent> coerceDataRecord(
return Optional.of(dataChangeEvent);
}
+
+ /** Deduce merged CreateTableEvent in batch mode. */
+ public static List<CreateTableEvent>
deduceMergedCreateTableEventInBatchMode(
Review Comment:
Yes, but `SchemaDerivator` itself shouldn't be aware of streaming / batch
execution mode. Similar initial deducing logic could be ported to streaming
mode later.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -90,7 +94,16 @@ public MySqlPipelineRecordEmitter(
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState
splitState)
throws Exception {
- if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState())
{
+ if
(StartupOptions.snapshot().equals(sourceConfig.getStartupOptions())) {
+ // In snapshot mode, we simply emit all schemas at once.
+ if (!alreadySendAllCreateTableTables) {
+ createTableEventCache.forEach(
+ (tableId, createTableEvent) -> {
+ output.collect(createTableEvent);
+ alreadySendAllCreateTableTables = true;
Review Comment:
nit: move `alreadySendAllCreateTableTables = true` out of the loop
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java:
##########
@@ -599,4 +600,113 @@ void testNormalizeSchemaChangeEventsInIgnoreMode() {
new DropTableEvent(NORMALIZE_TEST_TABLE_ID)))
.isEmpty();
}
+
+ @Test
+ void testDeduceMergedCreateTableEventInBatchMode() {
Review Comment:
ditto, remove `inBatchMode`
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java:
##########
@@ -129,6 +130,13 @@ public DataSource createDataSource(Context context) {
ZoneId serverTimeZone = getServerTimeZone(config);
StartupOptions startupOptions = getStartupOptions(config);
+ // Batch mode only supports StartupMode.SNAPSHOT.
+ Configuration pipelineConfiguration =
context.getPipelineConfiguration();
+ if (pipelineConfiguration != null
+ &&
pipelineConfiguration.contains(PipelineOptions.PIPELINE_BATCH_MODE_ENABLED)) {
+ startupOptions = StartupOptions.snapshot();
+ }
Review Comment:
Just a suggestion, what about throwing an exception explicitly if one
enables batch mode with a non-snapshotting source? That would prevent some
silent behavior change of `startupOptions` config.
Alternatively, we may add a interface in `DataSourceFactory` like this to
make things clearer:
```java
@PublicEvolving
public interface DataSourceFactory extends Factory {
/** Creates a {@link DataSource} instance. */
DataSource createDataSource(Context context);
/** Checking if this {@link DataSource} could be created in batch mode.
*/
boolean supportsBatchPipeline(Context context);
}
```
and verifies it during translating pipeline job graph. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]