vinothchandar commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1695675856


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -201,6 +201,18 @@ private void init(HoodieRecord record) {
             new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
             hoodieTable.getPartitionMetafileFormat());
         partitionMetadata.trySave(getPartitionId());
+        // If this is a second or subsequent attempt to create the data file, 
try to recover existing version.
+        if 
(recoverWriteStatusIfAvailable(partitionPath,FSUtils.makeBaseFileName(this.instantTime,
 this.writeToken,
+            this.fileId, hoodieTable.getBaseFileExtension()), 
this.instantTime)) {
+          this.writer = null;

Review Comment:
   do we need to close the `writer` as well? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -140,10 +157,99 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    * Creates an empty marker file corresponding to storage writer path.
    *
    * @param partitionPath Partition path
+   * @param dataFileName data file for which inprogress marker creation is 
requested
+   * @param markerInstantTime - instantTime associated with the request
+   * returns true - inprogress marker successfully created,
+   *         false - inprogress marker was not created.
+   */
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  protected boolean recoverWriteStatusIfAvailable(String partitionPath, String 
dataFileName,
+                                                  String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (config.isFailRetriesAfterFinalizeWrite()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(StringUtils.EMPTY_STRING,
+        FINALIZE_WRITE_COMPLETED, markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException(" Failing retry attempt for 
instant " + instantTime
+          + " as the job is trying to re-write the data files, after writes 
have been finalized.");
+    }
+    if (config.optimizeTaskRetriesWithMarkers()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {
+      // read the writeStatuses for the previously completed successful 
attempt(s) from the completed marker file(s) and
+      // return false to indicate inprogress marker was not created (for the 
new snapshot version).
+      try {
+        if (recoverWriteStatuses(writeMarkers, dataFileName)) {
+          return true;
+        }
+      } catch (HoodieException | IOException e) {
+        // failed to read the contents of an existing completed marker. (one 
or more files)
+        // fall through to recreate the files.
+      }
+    }
+    return false;
+  }
+
+  /**
+   * If a single writer/executor wrote to multiple files (records split into 
multiple parts) then all the files would have
+   * the same write token identifying the task and fileId prefix. When 
recovering the writestatus for the requested file,
+   * we are trying to identify all the files created by the same executor by 
looking for file with the same fileId prefix
+   * and returning write statuses for all of them.
+   *
+   * @param writeMarkers -  to get marker related information
+   * @param dataFileName - first of the multiple files created by the 
writeHandle.
+   * @return true if one or more write statuses were recovered.  false, 
otherwise.
+   * @throws IOException
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  private boolean recoverWriteStatuses(WriteMarkers writeMarkers, String 
dataFileName) throws IOException {

Review Comment:
   nts: need to review deeply to ensure the DFS accesses are optimal. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -766,6 +767,10 @@ protected void reconcileAgainstMarkers(HoodieEngineContext 
context,
           waitForAllFiles(context, invalidPathsByPartition, 
FileVisibility.DISAPPEAR);
         }
       }
+      if (config.isFailRetriesAfterFinalizeWrite()) {

Review Comment:
   In general, there is no good reason for us to finalize the write of the data 
table, then write to MT table, I feel? Would n't that cut down the number of 
times you hit these issues, with MT `files` partition filtering out files 
written by any stray executors? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -140,10 +157,99 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    * Creates an empty marker file corresponding to storage writer path.
    *
    * @param partitionPath Partition path
+   * @param dataFileName data file for which inprogress marker creation is 
requested
+   * @param markerInstantTime - instantTime associated with the request
+   * returns true - inprogress marker successfully created,
+   *         false - inprogress marker was not created.
+   */
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  protected boolean recoverWriteStatusIfAvailable(String partitionPath, String 
dataFileName,
+                                                  String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (config.isFailRetriesAfterFinalizeWrite()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(StringUtils.EMPTY_STRING,
+        FINALIZE_WRITE_COMPLETED, markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException(" Failing retry attempt for 
instant " + instantTime

Review Comment:
   is n't this actually preventing a re-attempt of the write, failing the stray 
task? i.e thus preventing corruption. Throwing "CorruptedData" exception makes 
sense if the table is deemed corrupted at this point, if not, then lets please 
rename the exception



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -203,4 +238,49 @@ private Option<Path> create(Path markerPath, boolean 
checkIfExists) {
         + " in " + timer.endTimer() + " ms");
     return Option.of(markerPath);
   }
+
+  @Override
+  public Option<Path> createCompletionMarker(String partitionPath, String 
fileId, String instantTime, IOType type,
+                                                boolean ignoreExisting, 
Option<byte[]> serializedData)
+      throws HoodieException, IOException {
+    Path markerPath = getCompletionMarkerPath(partitionPath, fileId, 
instantTime, type);
+    if (!fs.exists(markerPath.getParent())) {
+      throw new HoodieException("Marker directory is absent." + 
markerPath.getParent());
+    }
+    try {
+      FSDataOutputStream outputStream = fs.create(markerPath, ignoreExisting);
+      if (serializedData.isPresent()) {
+        outputStream.writeLong(serializedData.get().length);
+        outputStream.write(serializedData.get());
+        outputStream.writeLong(generateChecksum(serializedData.get()));
+      }
+      outputStream.close();
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create completed marker " + 
markerPath);
+    }
+    return Option.of(markerPath);
+  }
+
+  public static long generateChecksum(byte[] data) {

Review Comment:
   nothing reuse from IOUtils? can we move this to some other utils class, more 
generic across code base



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,6 +148,25 @@ public Set<String> allMarkerFilePaths() {
     }
   }
 
+  @Override
+  public void createMarkerDir() throws HoodieIOException {

Review Comment:
   we should make timeline based markers work really well IMO. In cloud 
storage, direct markers can have same access scaling issues that MT solves for 
e.g



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +152,36 @@ private String translateMarkerToDataPath(String 
markerPath) {
     return stripMarkerSuffix(rPath);
   }
 
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+  }
+
+  public static String stripOldStyleMarkerSuffix(String path) {

Review Comment:
   if we change marker file naming, we should see if we can introduce 
versioning for those? (like we do with other places where bits are written to 
storage?)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -140,10 +157,99 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    * Creates an empty marker file corresponding to storage writer path.
    *
    * @param partitionPath Partition path
+   * @param dataFileName data file for which inprogress marker creation is 
requested
+   * @param markerInstantTime - instantTime associated with the request
+   * returns true - inprogress marker successfully created,
+   *         false - inprogress marker was not created.
+   */
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  protected boolean recoverWriteStatusIfAvailable(String partitionPath, String 
dataFileName,
+                                                  String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (config.isFailRetriesAfterFinalizeWrite()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(StringUtils.EMPTY_STRING,
+        FINALIZE_WRITE_COMPLETED, markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException(" Failing retry attempt for 
instant " + instantTime
+          + " as the job is trying to re-write the data files, after writes 
have been finalized.");
+    }
+    if (config.optimizeTaskRetriesWithMarkers()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {

Review Comment:
   IIUC it should skip and move forward with `p1/f1-1_...`  . @nbalajee is this 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to