lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2114526383


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeC
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService, 
SparkUpgradeDowngradeHelper.getInstance());
     this.tableServiceClient = new SparkRDDTableServiceClient<T>(context, 
writeConfig, getTimelineServer());
+    isMetadataTable = 
HoodieTableMetadata.isMetadataTable(config.getBasePath());
   }
 
   @Override
   protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
     return SparkHoodieIndexFactory.createIndex(config);
   }
 
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc, new 
NoOpWriteStatusHandlerCallback());
+  }
+
   /**
    * Complete changes performed at the given instantTime marker with specified 
action.
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        WriteStatusHandlerCallback writeStatusHandlerCallback) 
{
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
-    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggers at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, WriteStatus is expected to contain 
all stats required to generate metadata table records and so it could be fatter.

Review Comment:
   Can not think of a simpler name. Maybe simpleWriteStatuses?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to