lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2114539119
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,112 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
return writeStatusRDD;
}
}
+
+ static class HoodieStreamerWriteStatusHandlerCallback implements
WriteStatusHandlerCallback {
+
+ private final boolean commitOnErrors;
+ private final String instantTime;
+ private final HoodieStreamer.Config cfg;
+ private final Option<BaseErrorTableWriter> errorTableWriter;
+ private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+ private final HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+ private final boolean isErrorTableWriteUnificationEnabled;
+ private final String errorTableInstantTime;
+ private final SparkRDDWriteClient writeClient;
+ private final Option<String> latestCommittedInstant;
+ private final AtomicLong totalSuccessfulRecords;
+
+ HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+ String instantTime,
+ HoodieStreamer.Config cfg,
+ Option<BaseErrorTableWriter>
errorTableWriter,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+ boolean
isErrorTableWriteUnificationEnabled,
+ String errorTableInstantTime,
+ SparkRDDWriteClient writeClient,
+ Option<String>
latestCommittedInstant,
+ AtomicLong
totalSuccessfulRecords) {
+ this.commitOnErrors = commitOnErrors;
+ this.instantTime = instantTime;
+ this.cfg = cfg;
+ this.errorTableWriter = errorTableWriter;
+ this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+ this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+ this.isErrorTableWriteUnificationEnabled =
isErrorTableWriteUnificationEnabled;
+ this.errorTableInstantTime = errorTableInstantTime;
+ this.writeClient = writeClient;
+ this.latestCommittedInstant = latestCommittedInstant;
+ this.totalSuccessfulRecords = totalSuccessfulRecords;
+ }
+
+ @Override
+ public boolean processWriteStatuses(long tableTotalRecords, long
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+ long totalRecords = tableTotalRecords;
+ long totalErroredRecords = tableTotalErroredRecords;
+ // TODO: Remove flag isErrorTableWriteUnificationEnabled, should not be
required anymore
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]