nbalajee commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1242478130
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String
actionType, HoodieTableMetaC
metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
+
+ // populate marker directory for the commit.
+ WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime).createMarkerDir();
Review Comment:
That is the current behavior, DoesMarkerDirExists() check ensures that an
executor can't start/complete the write operation, after finalizeWrite().
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("File Id Prefix provider class, that implements
`org.apache.hudi.fileid.FileIdPrefixProvider`");
+ public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS
= ConfigProperty
+ .key("hoodie.markers.enforce.completion.checks")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Prevents the creation of duplicate data files, when
multiple spark tasks are racing to "
+ + "create data files and a completed data file is already present");
+
+ public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK =
ConfigProperty
+ .key("hoodie.markers.enforce.finalize.write.check")
+ .defaultValue("false")
+ .sinceVersion("0.10.0")
+ .withDocumentation("When WriteStatus obj is lost due to engine related
failures, then recomputing would involve "
+ + "re-writing all the data files. When this check is enabled it
would block the rewrite from happening.");
Review Comment:
I will update the doc.
Context: This check was added to address the following scenario:
(1) as part of the insert/upsert operation, a set of files have been created
(p1/f1_w1_c1.parquet, p2/f2_w2_c1.parquet - corresponding to commit c1).
(2) FinalizeWrite() successfully purged, files that were created, but not
part of the writeStatus.
(3) As part of completing the commit c1, we will update the MDT with
fileListing and RLI metadata. In order to update the record index, when
iterating over the writeStatuses, if writeStatus RDD blocks are found to be
missing, execution engine (spark) would re-trigger the write stage (to recreate
the write statuses).
Above flag is used to avoid rewriting all the files as part of stage retry
(which is more likely to fail during the second attempt). Instead, we fail the
job so that next write attempt can be made in a new job (after any required
resource tuning). Not an issue for small/medium sized tables. We have seen
this only on large tables (> 50B+ records).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]