nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1259981790


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException("Reconciliation for instant " + 
instantTime + " is completed, job is trying to re-write the data files.");
+    }
+    if (config.enforceCompletionMarkerCheck()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {
+      throw new HoodieIOException("Completed marker file exists for : " + 
dataFileName + " (" + instantTime + ")");
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  // visible for testing
+  public void createCompletedMarkerFile(String partition, String 
markerInstantTime) throws IOException {
+    try {
+      WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, 
instantTime)
+          .createCompletionMarker(partition, fileId, markerInstantTime, 
getIOType(), true);
+    } catch (Exception e) {
+      // Clean up the data file, if the marker is already present or marker 
directories don't exist.
+      Path partitionPath = 
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);

Review Comment:
   > On which case we might hit this? can you help clarify please.
   @nsivabalan - A task is running to create a data file.  Execution engine 
retried the operation on a different container/task (say stage attempt 1, due 
to a stage failure), while the previous task is still running in the background 
(not responsive to the driver).
   
   If the second task (retry attempt) were to finish the data write and create 
completion marker, at the time of closing the writeHandle the first Task (in 
the example), would cleanup the fail after noticing the presence of the 
completion marker.
   
   If the closing of the writeHandle were to happen after 
finalizeWrite/reconcileMarkers step, we would end up with a duplicate file, as 
data files created both tries will be present (will not be removed by reconcile 
step).
   
   @danny0405 - Updated to HoodieException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to