nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2119927844
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -800,92 +803,48 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime, useRowWriter);
Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
// write to error table
- JavaRDD<WriteStatus> dataTableWriteStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
- JavaRDD<WriteStatus> writeStatusRDD = dataTableWriteStatusRDD;
+ JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
String errorTableInstantTime = writeClient.createNewInstantTime();
Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt =
Option.empty();
if (errorTableWriter.isPresent() && isErrorTableWriteUnificationEnabled)
{
errorTableWriteStatusRDDOpt = errorTableWriter.map(w ->
w.upsert(errorTableInstantTime, instantTime, getLatestCommittedInstant()));
- writeStatusRDD = errorTableWriteStatusRDDOpt.map(errorTableWriteStatus
->
errorTableWriteStatus.union(dataTableWriteStatusRDD)).orElse(dataTableWriteStatusRDD);
Review Comment:
Note to reviewer:
For lines 809 to 882:
Here are the changes this patch brings in.
We are introducing writeStatusHandler Callback for writes to data table for
HoodieStreamer her.
Before this patch, the output from dataTable writeClient.upsert()
(RDD<WriteStatus) or any other operation has additional processing before we
call into writeClient.commit(RDD<WriteStatus>,...)
For eg, we union the rdd w/ error table dag and trigger it once.
and then we trigger writes to error table etc.
With this patch, we can't afford to batch data table rdd and error table rdd
to be triggered using single collect(), since w/ streaming writes, dag
dereferencing is controlled internally and caller is not supposed to
dereference. I have discussed this with @vinishjail97 (who is the author of
this feature (error dag unification) and he is aware of this change). Idea here
is that, in general, error table writes should not be tightly coupled w/ data
table writes. Error table writes should be treated as an independent entity and
data table writes dag should not have any changes irrespective of whether
someone enables error table or not. So, with that philosophy, we are going
ahead w/ this change, where the dag unification may not take effect as intended
w/ this patch.
Anyways, apart from the error dag unification, every other change is just
moving code around. We moved most of L809 - 852, L872 - 882 into the
HoodieStreamerWriteStatusHandlerCallback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]