lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2126255182


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
     return TimelineUtils.generateInstantTime(false, timeGenerator);
   }
 
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusHandlerCallback> 
writeStatusHandlerCallbackOpt) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
+    // Triggering the dag for writes to metadata table.
+    // When streaming writes are enabled, writes to metadata may not call this 
method as the caller tightly controls the dag de-referencing.
+    // Even then, to initialize a new partition in Metadata table and for non 
incremental operations like insert_overwrite, etc, writes to metadata table
+    // will invoke this commit method.
+    List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus -> 
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+    // Compute stats for the writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);
+    writeStatusesList.forEach(entry -> {
+      totalRecords.getAndAdd(entry.getTotalRecords());
+      totalErrorRecords.getAndAdd(entry.getTotalErrorRecords());
+    });
+
+    // reason why we are passing RDD<WriteStatus> to the writeStatusHandler 
callback: We can't afford to collect all write status to dirver if there are 
errors, since write status will hold
+    // every error record. So, just incase if there are errors, caller might 
be interested to fetch error records. And so, we are passing the 
RDD<WriteStatus> as last argument to the write status
+    // handler callback.
+    boolean canProceed = writeStatusHandlerCallbackOpt.isEmpty() || 
writeStatusHandlerCallbackOpt.get().processWriteStatuses(totalRecords.get(), 
totalErrorRecords.get(),

Review Comment:
   Addressed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
     return TimelineUtils.generateInstantTime(false, timeGenerator);
   }
 
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusHandlerCallback> 
writeStatusHandlerCallbackOpt) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
+    // Triggering the dag for writes to metadata table.
+    // When streaming writes are enabled, writes to metadata may not call this 
method as the caller tightly controls the dag de-referencing.
+    // Even then, to initialize a new partition in Metadata table and for non 
incremental operations like insert_overwrite, etc, writes to metadata table
+    // will invoke this commit method.
+    List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus -> 
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+    // Compute stats for the writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);

Review Comment:
   These are used in lambda expressions, therefore had to use atomic references 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to