nsivabalan commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1240377379


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {

Review Comment:
   may I know why this additional check. Was there any bug we encountered and 
hence we are eagerly created the marker dir upfront ? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {

Review Comment:
   can we create constants for "FINALIZE_WRITE"
   also StringUtils.EMPTY_STRING for "" 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.10.0")
       .withDocumentation("File Id Prefix provider class, that implements 
`org.apache.hudi.fileid.FileIdPrefixProvider`");
 
+  public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS 
= ConfigProperty
+      .key("hoodie.markers.enforce.completion.checks")
+      .defaultValue("false")
+      .sinceVersion("0.10.0")
+      .withDocumentation("Prevents the creation of duplicate data files, when 
multiple spark tasks are racing to "
+          + "create data files and a completed data file is already present");
+
+  public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK = 
ConfigProperty
+      .key("hoodie.markers.enforce.finalize.write.check")
+      .defaultValue("false")
+      .sinceVersion("0.10.0")
+      .withDocumentation("When WriteStatus obj is lost due to engine related 
failures, then recomputing would involve "
+          + "re-writing all the data files. When this check is enabled it 
would block the rewrite from happening.");

Review Comment:
   docs is not very clear to me. can you enhance further please 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.10.0")
       .withDocumentation("File Id Prefix provider class, that implements 
`org.apache.hudi.fileid.FileIdPrefixProvider`");
 
+  public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS 
= ConfigProperty
+      .key("hoodie.markers.enforce.completion.checks")

Review Comment:
   hoodie.optimize.task.retries might be generic naming. 
   markers is just one way to achieving it. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -441,6 +441,9 @@ public List<WriteStatus> close() {
 
       performMergeDataValidationCheck(writeStatus);
 
+      // createCompleteMarkerFile throws hoodieException, if marker directory 
is not present.
+      createCompletedMarkerFile(partitionPath, this.instantTime);

Review Comment:
   +1 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -512,6 +512,7 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      createCompletedMarkerFile(partitionPath, baseInstantTime);

Review Comment:
   yes. why are we creating completion markers by default. I was also expecting 
it to be guarded the new configs added 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
         while (itr.hasNext()) {
           FileStatus status = itr.next();
           String pathStr = status.getPath().toString();
-          if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && 
!pathStr.endsWith(IOType.APPEND.name())) {
+          if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN) 
&& !pathStr.endsWith(IOType.APPEND.name())) {
             result.add(translateMarkerToDataPath(pathStr));

Review Comment:
   actually we are interested only in inProgress here. 
   crux here is that. 
   if spark retries and in 2nd attempt, a data file was created, but never 
completed (possible in hdfs), and so it may not have completion makers created. 
   
   from the reconcile standpoint, we need to get hold of all Inprogress files. 
does not matter whether they are completed or not. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -151,6 +178,24 @@ public Schema getWriterSchema() {
     return writeSchema;
   }
 
+  protected void cleanupDataFile(Path dataFile) throws IOException {

Review Comment:
   why protected? base WriteHandle class is the only class that accesses this 
right ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String 
actionType, HoodieTableMetaC
       metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
               instantTime));
     }
+
+    // populate marker directory for the commit.
+    WriteMarkersFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime).createMarkerDir();

Review Comment:
   may be we can move it within 
   ```
   public final HoodieTable initTable(WriteOperationType operationType, 
Option<String> instantTime) {
   ```
   in BaseHoodieWriteClient 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String 
markerPath) {
     return stripMarkerSuffix(rPath);
   }
 
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+  }
+
+  public static String stripOldStyleMarkerSuffix(String path) {
+    // marker file was created by older version of Hudi, with 
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+    // Rename to data file by replacing .marker with .parquet.
+    return String.format("%s%s", path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+        HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
   @Override
   public Set<String> allMarkerFilePaths() throws IOException {
     Set<String> markerFiles = new HashSet<>();
     if (doesMarkerDirExist()) {
       FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-        
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
 basePath, instantTime));
+        // Only the inprogres markerFiles are to be included here
+        if 
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
 {

Review Comment:
   same here. 
   we call allMarkerFilePaths() for rollback purpose. So, makes sense to fetch 
only in progress markers



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {

Review Comment:
   in cloud stores, we should try and avoid direct fs calls if feasible. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException("Reconciliation for instant " + 
instantTime + " is completed, job is trying to re-write the data files.");
+    }
+    if (config.enforceCompletionMarkerCheck()
+        && 
writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath(partitionPath, 
fileId, markerInstantTime, getIOType()))) {
+      throw new HoodieIOException("Completed marker file exists for : " + 
dataFileName + " (" + instantTime + ")");
+    }
+    writeMarkers.create(partitionPath, dataFileName, getIOType());
+  }
+
+  // visible for testing
+  public void createCompletedMarkerFile(String partition, String 
markerInstantTime) throws IOException {
+    try {
+      WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, 
instantTime)
+          .createCompletionMarker(partition, fileId, markerInstantTime, 
getIOType(), true);
+    } catch (Exception e) {
+      // Clean up the data file, if the marker is already present or marker 
directories don't exist.
+      Path partitionPath = 
FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partition);

Review Comment:
   On which case we might hit this? can you help clarify please. 
   ie. marker dir is not present, but a write handle is still running and is 
invoking code to create completion marker. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {

Review Comment:
   I feel we are kind of starting to use Markers as a state store. 
   so far, its used only for single data file creation marker. 
   And now we are starting to use it for overall write status to understand 
whether FINALIZE_WRITE is completed or not.
   
   Did we consider any other options here before narrowing down to this 
approach. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String 
actionType, HoodieTableMetaC
       metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
               instantTime));
     }
+
+    // populate marker directory for the commit.
+    WriteMarkersFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime).createMarkerDir();

Review Comment:
   whats the purpose of this. in this patch we are adding success markers 
right. So, I don't see any reason to add this additional step. Also, we are 
creating a new table (which might load the timeline) and which is never used 
anywhere else. can we avoid this call if not really a necessary 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to