lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2126255182
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
return TimelineUtils.generateInstantTime(false, timeGenerator);
}
+ @Override
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusHandlerCallback>
writeStatusHandlerCallbackOpt) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
+ // Triggering the dag for writes to metadata table.
+ // When streaming writes are enabled, writes to metadata may not call this
method as the caller tightly controls the dag de-referencing.
+ // Even then, to initialize a new partition in Metadata table and for non
incremental operations like insert_overwrite, etc, writes to metadata table
+ // will invoke this commit method.
+ List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus ->
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
+ writeStatusesList.forEach(entry -> {
+ totalRecords.getAndAdd(entry.getTotalRecords());
+ totalErrorRecords.getAndAdd(entry.getTotalErrorRecords());
+ });
+
+ // reason why we are passing RDD<WriteStatus> to the writeStatusHandler
callback: We can't afford to collect all write status to dirver if there are
errors, since write status will hold
+ // every error record. So, just incase if there are errors, caller might
be interested to fetch error records. And so, we are passing the
RDD<WriteStatus> as last argument to the write status
+ // handler callback.
+ boolean canProceed = writeStatusHandlerCallbackOpt.isEmpty() ||
writeStatusHandlerCallbackOpt.get().processWriteStatuses(totalRecords.get(),
totalErrorRecords.get(),
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
return TimelineUtils.generateInstantTime(false, timeGenerator);
}
+ @Override
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusHandlerCallback>
writeStatusHandlerCallbackOpt) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
+ // Triggering the dag for writes to metadata table.
+ // When streaming writes are enabled, writes to metadata may not call this
method as the caller tightly controls the dag de-referencing.
+ // Even then, to initialize a new partition in Metadata table and for non
incremental operations like insert_overwrite, etc, writes to metadata table
+ // will invoke this commit method.
+ List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus ->
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
Review Comment:
These are used in lambda expressions, therefore had to use atomic references
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]