This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b98c9ab  [HUDI-1895] Close the file handles gracefully for flink write 
function to avoid corrupted files (#2938)
b98c9ab is described below

commit b98c9ab4392727b6b9731aea88deaa5337a91dcf
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 12 18:44:10 2021 +0800

    [HUDI-1895] Close the file handles gracefully for flink write function to 
avoid corrupted files (#2938)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 22 +++++++-----
 .../java/org/apache/hudi/io/FlinkAppendHandle.java | 41 ++++++++++++++++------
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 16 +++++++++
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 16 +++++++++
 .../java/org/apache/hudi/io/MiniBatchHandle.java   | 17 ++++++++-
 .../delta/BaseFlinkDeltaCommitActionExecutor.java  |  2 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  2 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  2 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  | 14 ++++----
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  5 ++-
 10 files changed, 103 insertions(+), 34 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index a84e116..6cbbeec 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -380,9 +380,19 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
    * would close the underneath file handles.
    */
   public void cleanHandles() {
-    this.bucketToHandles.values().forEach(handle -> {
-      ((MiniBatchHandle) handle).finishWrite();
-    });
+    this.bucketToHandles.values()
+        .forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
+    this.bucketToHandles.clear();
+  }
+
+  /**
+   * Clean the write handles within a checkpoint interval, this operation
+   * would close the underneath file handles, if any error happens, clean the
+   * corrupted data file.
+   */
+  public void cleanHandlesGracefully() {
+    this.bucketToHandles.values()
+        .forEach(handle -> ((MiniBatchHandle) handle).closeGracefully());
     this.bucketToHandles.clear();
   }
 
@@ -438,12 +448,6 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     return table;
   }
 
-  public List<String> getInflightsAndRequestedInstants(String commitType) {
-    HoodieTimeline unCompletedTimeline = 
getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
-    return unCompletedTimeline.getInstants().filter(x -> 
x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
-        .collect(Collectors.toList());
-  }
-
   public String getLastPendingInstant(HoodieTableType tableType) {
     final String actionType = CommitUtils.getCommitActionType(tableType);
     return getLastPendingInstant(actionType);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index bc6eba8..c859d26 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,12 +47,12 @@ import java.util.List;
  * @param <K> Key type
  * @param <O> Output type
  */
-public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends 
HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
+public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkAppendHandle.class);
-  private boolean needBootStrap = true;
-  // Total number of bytes written to file
-  private long sizeInBytes = 0;
+
+  private boolean shouldRollover = false;
 
   public FlinkAppendHandle(
       HoodieWriteConfig config,
@@ -65,6 +66,17 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
   }
 
   @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    // In some rare cases, the task was pulled up again with same write file 
name,
+    // for e.g, reuse the small log files from last commit instant.
+
+    // Just skip the marker file creation if it already exists, the new data 
would append to
+    // the file directly.
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
+  }
+
+  @Override
   protected boolean needsUpdateLocation() {
     return false;
   }
@@ -80,12 +92,8 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     return true;
   }
 
-  /**
-   * Returns whether there is need to bootstrap this file handle.
-   * E.G. the first time that the handle is created.
-   */
-  public boolean isNeedBootStrap() {
-    return this.needBootStrap;
+  public boolean shouldRollover() {
+    return this.shouldRollover;
   }
 
   /**
@@ -98,7 +106,7 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
 
   @Override
   public List<WriteStatus> close() {
-    needBootStrap = false;
+    shouldRollover = true;
     // flush any remaining records to disk
     appendDataAndDeleteBlocks(header);
     // need to fix that the incremental write size in bytes may be lost
@@ -118,4 +126,15 @@ public class FlinkAppendHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
       throw new HoodieUpsertException("Failed to close append handle", e);
     }
   }
+
+  @Override
+  public void closeGracefully() {
+    try {
+      finishWrite();
+    } catch (Throwable throwable) {
+      // The intermediate log file can still append based on the incremental 
MERGE semantics,
+      // there is no need to delete the file.
+      LOG.warn("Error while trying to dispose the APPEND handle", throwable);
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index fc0f8ef..fe5bf99 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -185,6 +185,22 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
     }
   }
 
+  @Override
+  public void closeGracefully() {
+    try {
+      finishWrite();
+    } catch (Throwable throwable) {
+      LOG.warn("Error while trying to dispose the CREATE handle", throwable);
+      try {
+        fs.delete(path, false);
+        LOG.info("Deleting the intermediate CREATE data file: " + path + " 
success!");
+      } catch (IOException e) {
+        // logging a warning and ignore the exception.
+        LOG.warn("Deleting the intermediate CREATE data file: " + path + " 
failed", e);
+      }
+    }
+  }
+
   /**
    * Performs actions to durably, persist the current changes and returns a 
WriteStatus object.
    */
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 023ee5f..518ea69 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -238,4 +238,20 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
       throw new HoodieIOException("Error when rename the temporary roll file: 
" + lastPath + " to: " + desiredPath, e);
     }
   }
+
+  @Override
+  public void closeGracefully() {
+    try {
+      finishWrite();
+    } catch (Throwable throwable) {
+      LOG.warn("Error while trying to dispose the MERGE handle", throwable);
+      try {
+        fs.delete(newFilePath, false);
+        LOG.info("Deleting the intermediate MERGE data file: " + newFilePath + 
" success!");
+      } catch (IOException e) {
+        // logging a warning and ignore the exception.
+        LOG.warn("Deleting the intermediate MERGE data file: " + newFilePath + 
" failed", e);
+      }
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 2cae807..30ac317 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -22,9 +22,24 @@ package org.apache.hudi.io;
  * Hoodie write handle that supports write as mini-batch.
  */
 public interface MiniBatchHandle {
+
+  /**
+   * Returns whether the handle should roll over to new,
+   * E.G. the handle has written some intermediate data buffer already.
+   */
+  default boolean shouldRollover() {
+    return false;
+  }
+
   /**
    * Finish the write of multiple mini-batches. Usually these mini-bathes
-   * come from a checkpoint interval.
+   * come from one checkpoint interval.
    */
   void finishWrite();
+
+  /**
+   * Close the file handle gracefully, if any error happens during the file 
handle close,
+   * clean the file to not left corrupted file.
+   */
+  void closeGracefully();
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
index 8d0857e..674a8fd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
@@ -49,7 +49,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T 
extends HoodieRecordP
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId, Iterator<HoodieRecord<T>> recordItr) {
     FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
-    if (!appendHandle.isNeedBootStrap()) {
+    if (appendHandle.shouldRollover()) {
       appendHandle.appendNewRecords(recordItr);
     }
     appendHandle.doAppend();
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5a04a35..d9cb389 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -200,7 +200,7 @@ public class StreamWriteFunction<K, I, O>
   @Override
   public void close() {
     if (this.writeClient != null) {
-      this.writeClient.cleanHandles();
+      this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
     }
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 6244d65..ddbb3dd 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -249,7 +249,7 @@ public class StreamWriteOperatorCoordinator
               "The coordinator can only handle BatchWriteSuccessEvent");
           BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) 
operatorEvent;
           // the write task does not block after checkpointing(and before it 
receives a checkpoint success event),
-          // if it it checkpoints succeed then flushes the data buffer again 
before this coordinator receives a checkpoint
+          // if it checkpoints succeed then flushes the data buffer again 
before this coordinator receives a checkpoint
           // success event, the data buffer would flush with an older instant 
time.
           ValidationUtils.checkState(
               HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 02ab280..808e9c7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -102,7 +102,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   //  Utilities
   // -------------------------------------------------------------------------
 
-  /** Validate required options. e.g record key and pre combine key.
+  /** Validate required options. For e.g, record key and pre_combine key.
    *
    * @param conf The table options
    * @param schema The table schema
@@ -115,17 +115,17 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
           .filter(field -> !fields.contains(field))
           .findAny()
-          .ifPresent(e -> {
-            throw new ValidationException("The " + e + " field not exists in 
table schema."
-                + "Please define primary key or modify 
hoodie.datasource.write.recordkey.field option.");
+          .ifPresent(f -> {
+            throw new ValidationException("Field '" + f + "' does not exist in 
the table schema."
+                + "Please define primary key or modify 
'hoodie.datasource.write.recordkey.field' option.");
           });
     }
 
-    // validate pre combine key
+    // validate pre_combine key
     String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
     if (!fields.contains(preCombineField)) {
-      throw new ValidationException("The " + preCombineField + " field not 
exists in table schema."
-          + "Please check write.precombine.field option.");
+      throw new ValidationException("Field " + preCombineField + " does not 
exist in the table schema."
+          + "Please check 'write.precombine.field' option.");
     }
   }
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 69627f2..41c587c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -392,9 +392,8 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
   }
 
-  @ParameterizedTest
-  @EnumSource(value = ExecMode.class)
-  void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
+  @Test
+  void testStreamReadEmptyTablePath() throws Exception {
     // create an empty table
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     StreamerUtil.initTableIfNotExists(conf);

Reply via email to