danny0405 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2126495533


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -82,16 +86,65 @@ protected HoodieIndex createIndex(HoodieWriteConfig 
writeConfig) {
     return SparkHoodieIndexFactory.createIndex(config);
   }
 
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+  }
+
   /**
    * Complete changes performed at the given instantTime marker with specified 
action.
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusValidator> writeStatusValidatorOpt) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
-    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggered at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so it 
could be fatter.
+    // So, here we are dropping all additional stats and only retains the 
information required to proceed from here on.
+    // And we are also dropping error records so that we don't unintentionally 
collect the error records in the driver.
+    HoodieTable table = createTable(config);
+    boolean isMetadataStreamingWritesEnabled = 
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
+    List<Pair<Boolean, WriteStatus>> isMetadataWriteStatusPairs = writeStatuses
+        .map(writeStatus -> {
+          if (isMetadataStreamingWritesEnabled) {
+            writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking();
+          } else {
+            writeStatus.dropGranularErrorRecordsTracking();
+          }
+          return Pair.of(writeStatus.isMetadataTable(), writeStatus);
+        }
+    ).collect();
+    // Compute stats for the writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);
+    isMetadataWriteStatusPairs.stream().filter(entry -> 
table.isMetadataTable() && entry.getKey()).forEach(pair -> {
+      totalRecords.getAndAdd(pair.getValue().getTotalRecords());
+      totalErrorRecords.getAndAdd(pair.getValue().getTotalErrorRecords());
+    });
+    // reason why we are passing RDD<WriteStatus> to the writeStatusHandler 
callback: At the beginning of this method, we drop all index stats and error 
records before collecting in the driver.
+    // Just incase if there are errors, caller might be interested to fetch 
error records in the callback. And so, we are passing the RDD<WriteStatus> as 
last argument to the write status
+    // handler callback.
+    boolean canProceed = writeStatusValidatorOpt.map(callback -> 
callback.validate(totalRecords.get(), totalErrorRecords.get(),
+            totalErrorRecords.get() > 0 ? 
Option.of(HoodieJavaRDD.of(writeStatuses.filter(status -> 
table.isMetadataTable() && 
status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) : 
Option.empty()))

Review Comment:
   `table.isMetadataTable()` is deterministic, can we move it out of the 
per-item filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to