yuxiqian commented on code in PR #3802:
URL: https://github.com/apache/flink-cdc/pull/3802#discussion_r1906259219


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
 
 package org.apache.flink.cdc.common.event;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
  * An {@link Event} from {@code SchemaOperator} to notify {@code 
DataSinkWriterOperator} that it
  * start flushing.
  */
 public class FlushEvent implements Event {
+    /** The schema changes from which table. */
+    private final List<TableId> tableIds;
 
     /** Which subTask ID this FlushEvent was initiated from. */
     private final int sourceSubTaskId;
 
-    public FlushEvent(int sourceSubTaskId) {
+    /** Flag indicating whether the FlushEvent is sent before a create table 
event. */

Review Comment:
   Please modify JavaDocs, too



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java:
##########
@@ -82,14 +84,19 @@ public void serialize(Event event, DataOutputView 
dataOutputView) throws IOExcep
             BucketWrapperFlushEvent bucketWrapperFlushEvent = 
(BucketWrapperFlushEvent) event;
             dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
             
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
+            schemaChangeEventTypeEnumSerializer.serialize(
+                    bucketWrapperFlushEvent.getSchemaChangeEventType(), 
dataOutputView);

Review Comment:
   Similar concern here, should we keep original `tableIds` field when wrapping 
FlushEvents?



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java:
##########
@@ -44,7 +45,12 @@ protected Class<Event> getTypeClass() {
 
     @Override
     protected Event[] getTestData() {
-        Event[] flushEvents = new Event[] {new FlushEvent(1), new 
FlushEvent(2), new FlushEvent(3)};
+        Event[] flushEvents =
+                new Event[] {
+                    new FlushEvent(1, SchemaChangeEventType.CREATE_TABLE),
+                    new FlushEvent(2, SchemaChangeEventType.CREATE_TABLE),
+                    new FlushEvent(3, SchemaChangeEventType.CREATE_TABLE)
+                };

Review Comment:
   Please also test if FlushEvents with extra TableIds could be correctly 
[de]serialized.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java:
##########
@@ -48,7 +49,12 @@ protected Class<PartitioningEvent> getTypeClass() {
 
     @Override
     protected PartitioningEvent[] getTestData() {
-        Event[] flushEvents = new Event[] {new FlushEvent(1), new 
FlushEvent(2), new FlushEvent(3)};
+        Event[] flushEvents =
+                new Event[] {
+                    new FlushEvent(1, SchemaChangeEventType.CREATE_TABLE),
+                    new FlushEvent(2, SchemaChangeEventType.CREATE_TABLE),
+                    new FlushEvent(3, SchemaChangeEventType.CREATE_TABLE)
+                };

Review Comment:
   Ditto



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
 
 package org.apache.flink.cdc.common.event;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
  * An {@link Event} from {@code SchemaOperator} to notify {@code 
DataSinkWriterOperator} that it
  * start flushing.
  */
 public class FlushEvent implements Event {
+    /** The schema changes from which table. */
+    private final List<TableId> tableIds;
 
     /** Which subTask ID this FlushEvent was initiated from. */
     private final int sourceSubTaskId;
 
-    public FlushEvent(int sourceSubTaskId) {
+    /** Flag indicating whether the FlushEvent is sent before a create table 
event. */
+    private final SchemaChangeEventType schemaChangeEventType;
+
+    public FlushEvent(int sourceSubTaskId, SchemaChangeEventType 
schemaChangeEventType) {

Review Comment:
   Seems this constructor should be `@VisibleForTesting`. But I'm not quite 
sure why `BucketWrapperFlushEvent` is also calling this, since it might lose 
track of the `tableIds` field if sink declares an extra bucket assigning 
topology.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java:
##########
@@ -17,25 +17,49 @@
 
 package org.apache.flink.cdc.common.event;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
  * An {@link Event} from {@code SchemaOperator} to notify {@code 
DataSinkWriterOperator} that it
  * start flushing.
  */
 public class FlushEvent implements Event {
+    /** The schema changes from which table. */

Review Comment:
   If I understand correctly, this `TableId` set is the "affected" tables that 
need to be flushed, not the "original table that initiates the schema change".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to