yuxiqian commented on code in PR #3818:
URL: https://github.com/apache/flink-cdc/pull/3818#discussion_r1904956775


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java:
##########
@@ -50,8 +69,17 @@ public void open() throws Exception {
 
     @Override
     public void 
processElement(StreamRecord<CommittableMessage<MultiTableCommittable>> element) 
{
+        if (catalog == null) {
+            this.catalog = 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+            // flinkMetricGroup could be passed after FLIP-371.

Review Comment:
   Our baseline is Flink 1.19 where FLIP-371 has been resolved. Can we remove 
this TODO now?



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -101,6 +101,9 @@ public static Schema applySchemaChangeEvent(Schema schema, 
SchemaChangeEvent eve
     private static Schema applyAddColumnEvent(AddColumnEvent event, Schema 
oldSchema) {
         LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
         for (AddColumnEvent.ColumnWithPosition columnWithPosition : 
event.getAddedColumns()) {
+            if 
(oldSchema.getColumnNames().contains(columnWithPosition.getAddColumn().getName()))
 {
+                continue;
+            }

Review Comment:
   Seems we don't need this check after 
`SchemaUtils#isSchemaChangeEventRedundant` has guarded us from duplicately 
applying non-idempotent schema change events.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java:
##########
@@ -63,7 +64,12 @@ public DataStream<Event> 
addPreWriteTopology(DataStream<Event> dataStream) {
                 // All Events after BucketAssignOperator are decorated with 
BucketWrapper.
                 .partitionCustom(
                         (bucket, numPartitions) -> bucket % numPartitions,
-                        (event) -> ((BucketWrapper) event).getBucket());
+                        (event) -> ((BucketWrapper) event).getBucket())
+                // Avoid disorder of FlushEvent and DataChangeEvent.
+                .transform(
+                        "RemoveDuplicateEvents",

Review Comment:
   Make operatorName consistent with class name



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -183,19 +189,24 @@ public void processElement(StreamRecord<Event> 
streamRecord) throws Exception {
             schemaMaps.put(
                     createTableEvent.tableId(),
                     new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
-            output.collect(
-                    new StreamRecord<>(
-                            new BucketWrapperChangeEvent(currentTaskNumber, 
(ChangeEvent) event)));
+            for (int index = 0; index < totalTasksNumber; index++) {
+                output.collect(
+                        new StreamRecord<>(
+                                new BucketWrapperChangeEvent(index, 
(ChangeEvent) event)));
+            }
         } else if (event instanceof SchemaChangeEvent) {
             SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
             Schema schema =
                     SchemaUtils.applySchemaChangeEvent(
                             
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
                             schemaChangeEvent);
             schemaMaps.put(schemaChangeEvent.tableId(), new 
TableSchemaInfo(schema, zoneId));
-            output.collect(
-                    new StreamRecord<>(
-                            new BucketWrapperChangeEvent(currentTaskNumber, 
(ChangeEvent) event)));
+            // Broadcast SchemachangeEvent.
+            for (int index = 0; index < totalTasksNumber; index++) {
+                output.collect(
+                        new StreamRecord<>(
+                                new BucketWrapperChangeEvent(index, 
(ChangeEvent) event)));
+            }

Review Comment:
   Seems we can merge `event instanceof CreateTableEvent` and `event instanceof 
SchemaChangeEvent`?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java:
##########
@@ -64,34 +92,44 @@ public void finish() {
 
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) {
-        // CommittableSummary should be sent before all CommittableWithLineage.
-        CommittableMessage<MultiTableCommittable> summary =
-                new CommittableSummary<>(
-                        getRuntimeContext().getIndexOfThisSubtask(),
-                        getRuntimeContext().getNumberOfParallelSubtasks(),
-                        checkpointId,
-                        results.size(),
-                        results.size(),
-                        0);
-        output.collect(new StreamRecord<>(summary));
-
-        results.forEach(
-                committable -> {
-                    // update the right checkpointId for MultiTableCommittable
-                    MultiTableCommittable committableWithCheckPointId =
-                            new MultiTableCommittable(
-                                    committable.getDatabase(),
-                                    committable.getTable(),
-                                    checkpointId,
-                                    committable.kind(),
-                                    committable.wrappedCommittable());
-                    CommittableMessage<MultiTableCommittable> message =
-                            new CommittableWithLineage<>(
-                                    committableWithCheckPointId,
-                                    checkpointId,
-                                    
getRuntimeContext().getIndexOfThisSubtask());
-                    output.collect(new StreamRecord<>(message));
-                });
-        results.clear();
+        for (int i = 0; i < multiTableCommittables.size(); i++) {
+            MultiTableCommittable multiTableCommittable = 
multiTableCommittables.get(i);
+            multiTableCommittables.set(
+                    i,
+                    new MultiTableCommittable(
+                            multiTableCommittable.getDatabase(),
+                            multiTableCommittable.getTable(),
+                            checkpointId,
+                            multiTableCommittable.kind(),
+                            multiTableCommittable.wrappedCommittable()));
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        long checkpointId = context.getCheckpointId();
+        if (!multiTableCommittables.isEmpty()) {
+            multiTableCommittables.forEach(
+                    (multiTableCommittable) ->
+                            LOGGER.debug(
+                                    "Try to commit: "
+                                            + multiTableCommittable
+                                            + " in checkpoint "
+                                            + checkpointId));

Review Comment:
   ```suggestion
   LOGGER.debug("Try to commit: {} in checkpoint {}", ...);
   ```
   
   Thus, we don't need to pay for string concatenation with higher log level.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -210,6 +221,7 @@ public void processElement(StreamRecord<Event> 
streamRecord) throws Exception {
         }
         long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
         Integer numAssigners = 
table.coreOptions().dynamicBucketInitialBuckets();
+        LOGGER.debug("Succeed to get table info " + table);

Review Comment:
   ```suggestion
           LOGGER.debug("Succeed to get table info {}", table);
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Remove duplicate {@link FlushEvent} created by {@link 
BucketAssignOperator}. */
+public class FlushEventAlignmentOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event> {
+
+    private transient int totalTasksNumber;
+
+    /**
+     * Key: subtask id of {@link SchemaOperator}, Value: subtask ids of {@link
+     * BucketAssignOperator}.
+     */
+    private transient Map<Integer, Set<Integer>> 
sourceTaskIdToAssignBucketSubTaskIds;

Review Comment:
   I'm not sure if it's enough to distinguish `FlushEvent`s?
   
   Notice that each upstream partition might emit `FlushEvent`s individually, 
and all of them might be broadcast again in the `BucketAssignOperator`.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Remove duplicate {@link FlushEvent} created by {@link 
BucketAssignOperator}. */
+public class FlushEventAlignmentOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event> {
+
+    private transient int totalTasksNumber;
+
+    /**
+     * Key: subtask id of {@link SchemaOperator}, Value: subtask ids of {@link
+     * BucketAssignOperator}.
+     */
+    private transient Map<Integer, Set<Integer>> 
sourceTaskIdToAssignBucketSubTaskIds;
+
+    private transient int currentSubTaskId;
+
+    public FlushEventAlignmentOperator() {
+        // It's necessary to avoid unpredictable outcomes of Event shuffling.
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.totalTasksNumber = 
getRuntimeContext().getNumberOfParallelSubtasks();
+        this.currentSubTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        sourceTaskIdToAssignBucketSubTaskIds = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Event> streamRecord) {
+        Event event = streamRecord.getValue();
+        if (event instanceof BucketWrapperFlushEvent) {
+            BucketWrapperFlushEvent bucketWrapperFlushEvent = 
(BucketWrapperFlushEvent) event;
+            int sourceSubTaskId = bucketWrapperFlushEvent.getSourceSubTaskId();
+            Set<Integer> subTaskIds =
+                    sourceTaskIdToAssignBucketSubTaskIds.getOrDefault(
+                            sourceSubTaskId, new HashSet<>());
+            int subtaskId = bucketWrapperFlushEvent.getBucketAssignTaskId();
+            subTaskIds.add(subtaskId);
+            if (subTaskIds.size() == totalTasksNumber) {
+                LOG.info(currentSubTaskId + " send FlushEvent of " + 
sourceSubTaskId);
+                output.collect(new StreamRecord<>(new 
FlushEvent(sourceSubTaskId)));
+                sourceTaskIdToAssignBucketSubTaskIds.remove(sourceSubTaskId);
+            } else {
+                LOG.info(
+                        currentSubTaskId
+                                + " collect FlushEvent of "
+                                + sourceSubTaskId
+                                + " with subtask "
+                                + subtaskId);

Review Comment:
   Ditto, avoid eager string concatenation



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Remove duplicate {@link FlushEvent} created by {@link 
BucketAssignOperator}. */

Review Comment:
   ```suggestion
   /** Align {@link FlushEvent}s broadcasted by {@link BucketAssignOperator}. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to