nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1260374587


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String 
markerPath) {
     return stripMarkerSuffix(rPath);
   }
 
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+  }
+
+  public static String stripOldStyleMarkerSuffix(String path) {
+    // marker file was created by older version of Hudi, with 
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+    // Rename to data file by replacing .marker with .parquet.
+    return String.format("%s%s", path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+        HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
   @Override
   public Set<String> allMarkerFilePaths() throws IOException {
     Set<String> markerFiles = new HashSet<>();
     if (doesMarkerDirExist()) {
       FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-        
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
 basePath, instantTime));
+        // Only the inprogres markerFiles are to be included here
+        if 
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
 {

Review Comment:
   explained above.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
         while (itr.hasNext()) {
           FileStatus status = itr.next();
           String pathStr = status.getPath().toString();
-          if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && 
!pathStr.endsWith(IOType.APPEND.name())) {
+          if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN) 
&& !pathStr.endsWith(IOType.APPEND.name())) {
             result.add(translateMarkerToDataPath(pathStr));

Review Comment:
   1. Every attempt to create a datafile will have a corresponding in-progress 
marker file.
   2. A successful attempt at creating a data file would create a completion 
marker (if the flag is enabled)
   3. If second/subsequent attempts are made to recreate the file and a 
completion marker exists when trying to create the write handle, if the flag is 
enabled, old write status is returned and the attempt is considered successful.
   4. While an on-going write is in-progress (file write not completed yet), if 
a second attempt is started, a new in-progress marker will be created for the 
second/subsequent attempt.  At the time of closing the writeHandle/file, the 
first process would create the completion marker.  Second/subsequent files 
trying to close the writeHandle would cleanup the data file, upon seeing the 
presence of a completion marker.
   5. When reconciling data files, all in-progress markers are read and the 
list is pruned by removing entries that have a writeStatus.  Data files that 
have been created but don't have a corresponding write status are candidates 
for deletion during finalizeWrite.
   
   During step 5, only in-progress markers are considered (completed markers 
are not considered, as writeStatus is used as source of truth).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {

Review Comment:
   > In general, instead of exactly match of the state, I prefer the final 
consistency of the files diff we have on master.
   
   +1 on performing final consistency of files based on the write statuses.  
This is our preferred approach.   However, this change is trying to address 
extreme corner cases, that could result in extra files left on the dataset's 
partition folder, that might appear as dupes.  
   
   For datasets where global indexes are enabled, it is difficult to 
differentiate whether dupes are present due to extra left over data files 
created by retries or due to a genuine issue with global index (like, hbase or 
record level index).  This change helps to plug the race windows resulting in 
duplicate copies of data,  multiple attempts on file creation due to 
infrastructure related issues/instabilities.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to