danny0405 commented on code in PR #13285:
URL: https://github.com/apache/hudi/pull/13285#discussion_r2086653325


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -352,42 +367,38 @@ private static void 
initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
     writeClient.initMetadataTable();
   }
 
-  private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig, 
Configuration conf) throws IOException {
-    CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, 
conf);
-    ckpMetadata.bootstrap();
-    return ckpMetadata;
-  }
-
   private void initClientIds(Configuration conf) {
     this.clientIds = ClientIds.builder().conf(conf).build();
     this.clientIds.start();
   }
 
-  private void reset() {
-    this.eventBuffer = new WriteMetadataEvent[this.parallelism];
+  private void reset(long checkpointId) {
+    this.eventBuffers.remove(checkpointId);
   }
 
   /**
    * Checks the buffer is ready to commit.
    */
-  private boolean allEventsReceived() {
+  private boolean allEventsReceived(WriteMetadataEvent[] eventBuffer) {
     return Arrays.stream(eventBuffer)
         // we do not use event.isReady to check the instant
         // because the write task may send an event eagerly for empty
         // data set, the even may have a timestamp of last committed instant.
         .allMatch(event -> event != null && event.isLastBatch());
   }
 
-  private void addEventToBuffer(WriteMetadataEvent event) {
-    if (this.eventBuffer[event.getTaskID()] != null
-        && 
this.eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime()))
 {
-      this.eventBuffer[event.getTaskID()].mergeWith(event);
+  private WriteMetadataEvent[] addEventToBuffer(WriteMetadataEvent event) {
+    WriteMetadataEvent[] eventBuffer = 
this.eventBuffers.get(event.getCheckpointId()).getRight();
+    if (eventBuffer[event.getTaskID()] != null

Review Comment:
   no, the executor is a single thread execution mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to