This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 05606708fa [HUDI-4393] Add marker file for target file when flink 
merge handle rolls over (#6103)
05606708fa is described below

commit 05606708fabdd3b6414cce04802ac85617976d04
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jul 14 16:00:08 2022 +0800

    [HUDI-4393] Add marker file for target file when flink merge handle rolls 
over (#6103)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  2 +-
 .../apache/hudi/io/FlinkMergeAndReplaceHandle.java |  2 +-
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 28 +++++-----------------
 3 files changed, 8 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 57e816619f..1e68f820d9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -655,7 +655,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
       invalidDataPaths.removeAll(validDataPaths);
 
       if (!invalidDataPaths.isEmpty()) {
-        LOG.info("Removing duplicate data files created due to spark retries 
before committing. Paths=" + invalidDataPaths);
+        LOG.info("Removing duplicate data files created due to task retries 
before committing. Paths=" + invalidDataPaths);
         Map<String, List<Pair<String, String>>> invalidPathsByPartition = 
invalidDataPaths.stream()
             .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), 
new Path(basePath, dp).toString()))
             .collect(Collectors.groupingBy(Pair::getKey));
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index cf912f620a..9fea0a9718 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -167,7 +167,7 @@ public class FlinkMergeAndReplaceHandle<T extends 
HoodieRecordPayload, I, K, O>
     try {
       fs.rename(newFilePath, oldFilePath);
     } catch (IOException e) {
-      throw new HoodieIOException("Error while renaming the temporary roll 
file: "
+      throw new HoodieIOException("Error while renaming the temporary rollover 
file: "
           + newFilePath + " to old base file name: " + oldFilePath, e);
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 1bff89713b..99f111c82f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -43,7 +43,7 @@ import java.util.List;
 /**
  * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small 
data buffers).
  *
- * <p>For a new data buffer, it initialize and set up the next file path to 
write,
+ * <p>For a new data buffer, it initializes and set up the next file path to 
write,
  * and closes the file path when the data buffer write finish. When next data 
buffer
  * write starts, it rolls over to another new file. If all the data buffers 
write finish
  * for a checkpoint round, it renames the last new file path as the desired 
file name
@@ -143,8 +143,7 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
           break;
         }
 
-        oldFilePath = newFilePath; // override the old file name
-        rolloverPaths.add(oldFilePath);
+        rolloverPaths.add(newFilePath);
         newFileName = newFileNameWithRollover(rollNumber++);
         newFilePath = makeNewFilePath(partitionPath, newFileName);
         LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath 
+ ", rolls over to new path: " + newFilePath);
@@ -162,13 +161,6 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
         this.fileId, hoodieTable.getBaseFileExtension());
   }
 
-  @Override
-  protected void setWriteStatusPath() {
-    // if there was rollover, should set up the path as the initial new file 
path.
-    Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
-    writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
-  }
-
   @Override
   public List<WriteStatus> close() {
     try {
@@ -188,27 +180,19 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
 
   public void finalizeWrite() {
     // The file visibility should be kept by the configured ConsistencyGuard 
instance.
-    rolloverPaths.add(newFilePath);
-    if (rolloverPaths.size() == 1) {
+    if (rolloverPaths.size() == 0) {
       // only one flush action, no need to roll over
       return;
     }
 
-    for (int i = 0; i < rolloverPaths.size() - 1; i++) {
-      Path path = rolloverPaths.get(i);
+    for (Path path : rolloverPaths) {
       try {
         fs.delete(path, false);
+        LOG.info("Delete the rollover data file: " + path + " success!");
       } catch (IOException e) {
-        throw new HoodieIOException("Error when clean the temporary roll file: 
" + path, e);
+        throw new HoodieIOException("Error when clean the temporary rollover 
data file: " + path, e);
       }
     }
-    final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
-    final Path desiredPath = rolloverPaths.get(0);
-    try {
-      fs.rename(lastPath, desiredPath);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error when rename the temporary roll file: 
" + lastPath + " to: " + desiredPath, e);
-    }
   }
 
   @Override

Reply via email to