nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2119927844


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -800,92 +803,48 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
       WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, 
instantTime, useRowWriter);
       Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
       // write to error table
-      JavaRDD<WriteStatus> dataTableWriteStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
-      JavaRDD<WriteStatus> writeStatusRDD = dataTableWriteStatusRDD;
+      JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
       String errorTableInstantTime = writeClient.createNewInstantTime();
       Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt = 
Option.empty();
       if (errorTableWriter.isPresent() && isErrorTableWriteUnificationEnabled) 
{
         errorTableWriteStatusRDDOpt = errorTableWriter.map(w -> 
w.upsert(errorTableInstantTime, instantTime, getLatestCommittedInstant()));
-        writeStatusRDD = errorTableWriteStatusRDDOpt.map(errorTableWriteStatus 
-> 
errorTableWriteStatus.union(dataTableWriteStatusRDD)).orElse(dataTableWriteStatusRDD);

Review Comment:
   Note to reviewer: 
   For lines 809 to 882: 
   Here are the changes this patch brings in. 
   We are introducing writeStatusHandler Callback for writes to data table for 
HoodieStreamer her. 
   
   Before this patch, the output from dataTable writeClient.upsert() 
(RDD<WriteStatus) or any other operation has additional processing before we 
call into writeClient.commit(RDD<WriteStatus>,...)
   
   For eg, we union the rdd w/ error table dag and trigger it once. 
   and then we trigger writes to error table etc. 
   
   With this patch, we can't afford to batch data table rdd and error table rdd 
to be triggered using single collect(), since w/ streaming writes, dag 
dereferencing is controlled internally and caller is not supposed to 
dereference. I have discussed this with @vinishjail97 (who is the author of 
this feature (error dag unification) and he is aware of this change). Idea here 
is that, in general, error table writes should not be tightly coupled w/ data 
table writes. Error table writes should be treated as an independent entity and 
data table writes dag should not have any changes irrespective of whether 
someone enables error table or not. So, with that philosophy, we are going 
ahead w/ this change, where the dag unification may not take effect as intended 
w/ this patch. 
   
   Anyways, apart from the error dag unification, every other change is just 
moving code around. We moved most of L809 - 852, L872 - 882 into the 
HoodieStreamerWriteStatusHandlerCallback. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to