danny0405 commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1243097851


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException("Reconciliation for instant " + 
instantTime + " is completed, job is trying to re-write the data files.");
+    }
+    if (config.enforceCompletionMarkerCheck()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {
+      throw new HoodieIOException("Completed marker file exists for : " + 
dataFileName + " (" + instantTime + ")");
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  // visible for testing
+  public void createCompletedMarkerFile(String partition, String 
markerInstantTime) throws IOException {
+    try {
+      WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, 
instantTime)
+          .createCompletionMarker(partition, fileId, markerInstantTime, 
getIOType(), true);
+    } catch (Exception e) {
+      // Clean up the data file, if the marker is already present or marker 
directories don't exist.
+      Path partitionPath = 
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);

Review Comment:
   The literals are illegible to see clearly, would try to understand the 
workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to