yuxiqian commented on code in PR #3802:
URL: https://github.com/apache/flink-cdc/pull/3802#discussion_r1906259219
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
package org.apache.flink.cdc.common.event;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
* An {@link Event} from {@code SchemaOperator} to notify {@code
DataSinkWriterOperator} that it
* start flushing.
*/
public class FlushEvent implements Event {
+ /** The schema changes from which table. */
+ private final List<TableId> tableIds;
/** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId;
- public FlushEvent(int sourceSubTaskId) {
+ /** Flag indicating whether the FlushEvent is sent before a create table
event. */
Review Comment:
Please modify JavaDocs, too
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java:
##########
@@ -82,14 +84,19 @@ public void serialize(Event event, DataOutputView
dataOutputView) throws IOExcep
BucketWrapperFlushEvent bucketWrapperFlushEvent =
(BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
+ schemaChangeEventTypeEnumSerializer.serialize(
+ bucketWrapperFlushEvent.getSchemaChangeEventType(),
dataOutputView);
Review Comment:
Similar concern here, should we keep original `tableIds` field when wrapping
FlushEvents?
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java:
##########
@@ -44,7 +45,12 @@ protected Class<Event> getTypeClass() {
@Override
protected Event[] getTestData() {
- Event[] flushEvents = new Event[] {new FlushEvent(1), new
FlushEvent(2), new FlushEvent(3)};
+ Event[] flushEvents =
+ new Event[] {
+ new FlushEvent(1, SchemaChangeEventType.CREATE_TABLE),
+ new FlushEvent(2, SchemaChangeEventType.CREATE_TABLE),
+ new FlushEvent(3, SchemaChangeEventType.CREATE_TABLE)
+ };
Review Comment:
Please also test if FlushEvents with extra TableIds could be correctly
[de]serialized.
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java:
##########
@@ -48,7 +49,12 @@ protected Class<PartitioningEvent> getTypeClass() {
@Override
protected PartitioningEvent[] getTestData() {
- Event[] flushEvents = new Event[] {new FlushEvent(1), new
FlushEvent(2), new FlushEvent(3)};
+ Event[] flushEvents =
+ new Event[] {
+ new FlushEvent(1, SchemaChangeEventType.CREATE_TABLE),
+ new FlushEvent(2, SchemaChangeEventType.CREATE_TABLE),
+ new FlushEvent(3, SchemaChangeEventType.CREATE_TABLE)
+ };
Review Comment:
Ditto
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
package org.apache.flink.cdc.common.event;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
* An {@link Event} from {@code SchemaOperator} to notify {@code
DataSinkWriterOperator} that it
* start flushing.
*/
public class FlushEvent implements Event {
+ /** The schema changes from which table. */
+ private final List<TableId> tableIds;
/** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId;
- public FlushEvent(int sourceSubTaskId) {
+ /** Flag indicating whether the FlushEvent is sent before a create table
event. */
+ private final SchemaChangeEventType schemaChangeEventType;
+
+ public FlushEvent(int sourceSubTaskId, SchemaChangeEventType
schemaChangeEventType) {
Review Comment:
Seems this constructor should be `@VisibleForTesting`. But I'm not quite
sure why `BucketWrapperFlushEvent` is also calling this, since it might lose
track of the `tableIds` field if sink declares an extra bucket assigning
topology.
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
package org.apache.flink.cdc.common.event;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
* An {@link Event} from {@code SchemaOperator} to notify {@code
DataSinkWriterOperator} that it
* start flushing.
*/
public class FlushEvent implements Event {
+ /** The schema changes from which table. */
Review Comment:
If I understand correctly, this `TableId` set is the "affected" tables that
need to be flushed, not the "original table that initiates the schema change".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]