nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1242478376


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {

Review Comment:
   If we allow the markerDir to be created on a need basis, a stray executor 
starting to write to a file would create the directory after the finalize write 
and end up leaving a duplicate file.  
   
   By creating the markerDir at the time of startCommit() and deleting the 
directory at/after the finalizeWrite(),  we ensure that executors can't start a 
new write operation or successfully close an on-going write operation, if 
markerDir is missing (deleted by finalizeWrite).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException("Reconciliation for instant " + 
instantTime + " is completed, job is trying to re-write the data files.");
+    }
+    if (config.enforceCompletionMarkerCheck()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {
+      throw new HoodieIOException("Completed marker file exists for : " + 
dataFileName + " (" + instantTime + ")");
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  // visible for testing
+  public void createCompletedMarkerFile(String partition, String 
markerInstantTime) throws IOException {
+    try {
+      WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, 
instantTime)
+          .createCompletionMarker(partition, fileId, markerInstantTime, 
getIOType(), true);
+    } catch (Exception e) {
+      // Clean up the data file, if the marker is already present or marker 
directories don't exist.
+      Path partitionPath = 
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);

Review Comment:
   After the finalizeWrite and reconciling the files, we delete the 
markerDirectory.   If a stray executor were to complete the write operation and 
close the file after the reconcile step, it would find markerDirectory missing 
and would cleanup the datafile created.
   
   ![Screenshot 2023-06-25 at 9 15 35 
PM](https://github.com/apache/hudi/assets/47542891/f84e70f9-5f17-4454-8ff1-608c59056ef3)
   In the example, executor C trying to close the file, after finalizeWrite 
operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to