the-other-tim-brown commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2125364788


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
     return TimelineUtils.generateInstantTime(false, timeGenerator);
   }
 
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusHandlerCallback> 
writeStatusHandlerCallbackOpt) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
+    // Triggering the dag for writes to metadata table.
+    // When streaming writes are enabled, writes to metadata may not call this 
method as the caller tightly controls the dag de-referencing.
+    // Even then, to initialize a new partition in Metadata table and for non 
incremental operations like insert_overwrite, etc, writes to metadata table
+    // will invoke this commit method.
+    List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus -> 
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+    // Compute stats for the writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);
+    writeStatusesList.forEach(entry -> {
+      totalRecords.getAndAdd(entry.getTotalRecords());
+      totalErrorRecords.getAndAdd(entry.getTotalErrorRecords());
+    });
+
+    // reason why we are passing RDD<WriteStatus> to the writeStatusHandler 
callback: We can't afford to collect all write status to dirver if there are 
errors, since write status will hold
+    // every error record. So, just incase if there are errors, caller might 
be interested to fetch error records. And so, we are passing the 
RDD<WriteStatus> as last argument to the write status
+    // handler callback.
+    boolean canProceed = writeStatusHandlerCallbackOpt.isEmpty() || 
writeStatusHandlerCallbackOpt.get().processWriteStatuses(totalRecords.get(), 
totalErrorRecords.get(),

Review Comment:
   nitpick: use `map(..).orElse(false)`?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -114,6 +119,10 @@ public class HoodieWriteStat extends HoodieReadStats {
   @Nullable
   private RuntimeStats runtimeStats;
 
+  @JsonIgnore

Review Comment:
   The annotation was not on the 
[HoodieDeltaWriteStat.java](https://github.com/apache/hudi/pull/13307/files#diff-d4fa29e0b797c44385dccd4fe35b8f36f120a786c522837a4a3305a8c5001774)
 Will this have side-effects?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
     return TimelineUtils.generateInstantTime(false, timeGenerator);
   }
 
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusHandlerCallback> 
writeStatusHandlerCallbackOpt) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
+    // Triggering the dag for writes to metadata table.
+    // When streaming writes are enabled, writes to metadata may not call this 
method as the caller tightly controls the dag de-referencing.
+    // Even then, to initialize a new partition in Metadata table and for non 
incremental operations like insert_overwrite, etc, writes to metadata table
+    // will invoke this commit method.
+    List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus -> 
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+    // Compute stats for the writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);

Review Comment:
   nitpick: since we're not updating these values from multiple threads, the 
atomicLong seems like it is not required.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -344,4 +350,47 @@ public static JavaRDD<HoodieRecord> 
resolveDuplicates(JavaSparkContext jssc,
     return handleDuplicates(
         new HoodieSparkEngineContext(jssc), incomingHoodieRecords, 
writeConfig, failOnDuplicates);
   }
+
+  /**
+   * Callback for WriteStatus Handler.
+   * If there are error records, we print few of them and exit.
+   * If not, we proceed with the commit.
+   */
+  static class SparkDataSourceWriteStatusHandlerCallback implements 
WriteStatusHandlerCallback {
+
+    private final WriteOperationType writeOperationType;
+    private final AtomicBoolean hasErrored;
+
+    public SparkDataSourceWriteStatusHandlerCallback(WriteOperationType 
writeOperationType, AtomicBoolean hasErrored) {
+      this.writeOperationType = writeOperationType;
+      this.hasErrored = hasErrored;
+    }
+
+    @Override
+    public boolean processWriteStatuses(long totalRecords, long 
totalErroredRecords, Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+      if (totalErroredRecords > 0) {
+        hasErrored.set(true);
+        ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD 
<WriteStatus> expected to be present when there are errors");
+        LOG.error("%s failed with errors", writeOperationType);

Review Comment:
   `%s` should be `{}`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to