This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new efb9719018 [HUDI-4332] The current instant may be wrong under some 
extreme conditions in AppendWriteFunction. (#5988)
efb9719018 is described below

commit efb971901818479e37714fab3b655ed3dc57477b
Author: BruceLin <[email protected]>
AuthorDate: Tue Jun 28 20:42:26 2022 +0800

    [HUDI-4332] The current instant may be wrong under some extreme conditions 
in AppendWriteFunction. (#5988)
---
 .../org/apache/hudi/sink/append/AppendWriteFunction.java  | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 7b40718b35..6e275dc8c8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -106,30 +106,29 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   //  Utilities
   // -------------------------------------------------------------------------
   private void initWriterHelper() {
-    this.currentInstant = instantToWrite(true);
-    if (this.currentInstant == null) {
+    final String instant = instantToWrite(true);
+    if (instant == null) {
       // in case there are empty checkpoints that has no input data
       throw new HoodieException("No inflight instant when flushing data!");
     }
     this.writerHelper = new BulkInsertWriterHelper(this.config, 
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
-        this.currentInstant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+        instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
         this.rowType);
   }
 
   private void flushData(boolean endInput) {
     final List<WriteStatus> writeStatus;
-    final String instant;
     if (this.writerHelper != null) {
       writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
-      instant = this.writerHelper.getInstantTime();
+      this.currentInstant = this.writerHelper.getInstantTime();
     } else {
       writeStatus = Collections.emptyList();
-      instant = instantToWrite(false);
-      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
instant);
+      this.currentInstant = instantToWrite(false);
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
this.currentInstant);
     }
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
-        .instantTime(instant)
+        .instantTime(this.currentInstant)
         .writeStatus(writeStatus)
         .lastBatch(true)
         .endInput(endInput)

Reply via email to