This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dab5114  [HUDI-1804] Continue to write when Flink write task restart 
because of container killing (#2843)
dab5114 is described below

commit dab5114f168112f6e195af8257c01a7c7c8ee082
Author: Danny Chan <[email protected]>
AuthorDate: Mon Apr 19 19:43:41 2021 +0800

    [HUDI-1804] Continue to write when Flink write task restart because of 
container killing (#2843)
    
    The `FlinkMergeHande` creates a marker file under the metadata path
    each time it initializes, when a write task restarts from killing, it
    tries to create the existing file and reports error.
    
    To solve this problem, skip the creation and use the original data file
    as base file to merge.
---
 .../java/org/apache/hudi/table/MarkerFiles.java    | 49 ++++++++++++++++++----
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 30 ++++++++-----
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 43 +++++++++++++++----
 3 files changed, 97 insertions(+), 25 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java
index 40be741..d777db4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -189,23 +189,56 @@ public class MarkerFiles implements Serializable {
    * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
    */
   public Path create(String partitionPath, String dataFileName, IOType type) {
-    Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
+    Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
     try {
-      if (!fs.exists(path)) {
-        fs.mkdirs(path); // create a new partition as needed.
-      }
+      LOG.info("Creating Marker Path=" + markerPath);
+      fs.create(markerPath, false).close();
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
+      throw new HoodieException("Failed to create marker file " + markerPath, 
e);
     }
-    String markerFileName = String.format("%s%s.%s", dataFileName, 
HoodieTableMetaClient.MARKER_EXTN, type.name());
-    Path markerPath = new Path(path, markerFileName);
+    return markerPath;
+  }
+
+  /**
+   * The marker path will be 
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
+   *
+   * @return true if the marker file creates successfully,
+   * false if it already exists
+   */
+  public boolean createIfNotExists(String partitionPath, String dataFileName, 
IOType type) {
+    Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
     try {
+      if (fs.exists(markerPath)) {
+        LOG.warn("Marker Path=" + markerPath + " already exists, cancel 
creation");
+        return false;
+      }
       LOG.info("Creating Marker Path=" + markerPath);
       fs.create(markerPath, false).close();
     } catch (IOException e) {
       throw new HoodieException("Failed to create marker file " + markerPath, 
e);
     }
-    return markerPath;
+    return true;
+  }
+
+  /**
+   * Returns the marker path. Would create the partition path first if not 
exists.
+   *
+   * @param partitionPath The partition path
+   * @param dataFileName  The data file name
+   * @param type          The IO type
+   * @return path of the marker file
+   */
+  private Path getMarkerPath(String partitionPath, String dataFileName, IOType 
type) {
+    Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
+    try {
+      if (!fs.exists(path)) {
+        fs.mkdirs(path); // create a new partition as needed.
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    String markerFileName = String.format("%s%s.%s", dataFileName, 
HoodieTableMetaClient.MARKER_EXTN, type.name());
+    return new Path(path, markerFileName);
   }
 
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index 2abefa9..556d98e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -20,13 +20,14 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
 
 import org.apache.avro.Schema;
 import org.apache.log4j.LogManager;
@@ -35,7 +36,6 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 /**
  * A {@link HoodieCreateHandle} that supports create write 
incrementally(mini-batches).
@@ -68,13 +68,23 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
         taskContextSupplier);
   }
 
-  /**
-   * Called by the compactor code path.
-   */
-  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           String partitionPath, String fileId, Map<String, 
HoodieRecord<T>> recordMap,
-                           TaskContextSupplier taskContextSupplier) {
-    super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, 
taskContextSupplier);
+  @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    boolean created = markerFiles.createIfNotExists(partitionPath, 
dataFileName, getIOType());
+    if (!created) {
+      // If the marker file already exists, that means the write task
+      // was pulled up again with same data file name, removes the legacy
+      // data file first.
+      try {
+        if (fs.exists(path)) {
+          fs.delete(path, false);
+          LOG.warn("Legacy data file: " + path + " delete success");
+        }
+      } catch (IOException e) {
+        throw new HoodieException("Error while deleting legacy data file: " + 
path, e);
+      }
+    }
   }
 
   /**
@@ -109,7 +119,7 @@ public class FlinkCreateHandle<T extends 
HoodieRecordPayload, I, K, O>
   }
 
   @Override
-  protected long computeFileSizeInBytes() throws IOException {
+  protected long computeFileSizeInBytes() {
     return fileWriter.getBytesWritten();
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 244c56b..4da6404 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -70,13 +71,16 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
   /**
    * Records the rolled over file paths.
    */
-  private final List<Path> rolloverPaths;
+  private List<Path> rolloverPaths;
 
   public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
                           TaskContextSupplier taskContextSupplier) {
     super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier);
-    rolloverPaths = new ArrayList<>();
+    if (rolloverPaths == null) {
+      // #createMarkerFile may already initialize it already
+      rolloverPaths = new ArrayList<>();
+    }
   }
 
   /**
@@ -104,6 +108,25 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
     return false;
   }
 
+  @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+    boolean created = markerFiles.createIfNotExists(partitionPath, 
dataFileName, getIOType());
+    if (!created) {
+      // If the marker file already exists, that means the write task
+      // was pulled up again with same data file name, performs rolling over 
action here:
+      // use the new file path as the base file path (file1),
+      // and generates new file path with roll over number (file2).
+      // the incremental data set would merge into the file2 instead of file1.
+      //
+      // When the task do finalization in #finishWrite, the intermediate files 
would be cleaned.
+      oldFilePath = newFilePath;
+      rolloverPaths = new ArrayList<>();
+      rolloverPaths.add(oldFilePath);
+      newFilePath = makeNewFilePathWithRollover();
+    }
+  }
+
   /**
    *
    * Rolls over the write handle to prepare for the next batch write.
@@ -132,11 +155,7 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
 
     rolloverPaths.add(newFilePath);
     oldFilePath = newFilePath;
-    // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch 
write.
-    String newFileName = generatesDataFileNameWithRollover();
-    String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
-        + newFileName).toString();
-    newFilePath = new Path(config.getBasePath(), relativePath);
+    newFilePath = makeNewFilePathWithRollover();
 
     try {
       fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, 
config, writerSchemaWithMetafields, taskContextSupplier);
@@ -148,6 +167,16 @@ public class FlinkMergeHandle<T extends 
HoodieRecordPayload, I, K, O>
         newFilePath.toString()));
   }
 
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  private Path makeNewFilePathWithRollover() {
+    String newFileName = generatesDataFileNameWithRollover();
+    String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
+        + newFileName).toString();
+    return new Path(config.getBasePath(), relativePath);
+  }
+
   public void finishWrite() {
     // The file visibility should be kept by the configured ConsistencyGuard 
instance.
     rolloverPaths.add(newFilePath);

Reply via email to