nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2119932644
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,111 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
return writeStatusRDD;
}
}
+
+ static class HoodieStreamerWriteStatusHandlerCallback implements
WriteStatusHandlerCallback {
+
+ private final boolean commitOnErrors;
+ private final String instantTime;
+ private final HoodieStreamer.Config cfg;
+ private final Option<BaseErrorTableWriter> errorTableWriter;
+ private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+ private final HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+ private final boolean isErrorTableWriteUnificationEnabled;
+ private final String errorTableInstantTime;
+ private final SparkRDDWriteClient writeClient;
+ private final Option<String> latestCommittedInstant;
+ private final AtomicLong totalSuccessfulRecords;
+
+ HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+ String instantTime,
+ HoodieStreamer.Config cfg,
+ Option<BaseErrorTableWriter>
errorTableWriter,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+ boolean
isErrorTableWriteUnificationEnabled,
+ String errorTableInstantTime,
+ SparkRDDWriteClient writeClient,
+ Option<String>
latestCommittedInstant,
+ AtomicLong
totalSuccessfulRecords) {
+ this.commitOnErrors = commitOnErrors;
+ this.instantTime = instantTime;
+ this.cfg = cfg;
+ this.errorTableWriter = errorTableWriter;
+ this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+ this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+ this.isErrorTableWriteUnificationEnabled =
isErrorTableWriteUnificationEnabled;
+ this.errorTableInstantTime = errorTableInstantTime;
+ this.writeClient = writeClient;
+ this.latestCommittedInstant = latestCommittedInstant;
+ this.totalSuccessfulRecords = totalSuccessfulRecords;
+ }
+
+ @Override
+ public boolean processWriteStatuses(long tableTotalRecords, long
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+ long totalRecords = tableTotalRecords;
+ long totalErroredRecords = tableTotalErroredRecords;
+ if (isErrorTableWriteUnificationEnabled) {
+ totalRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
+ totalErroredRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+ }
+ long totalSuccessfulRecords = totalRecords - totalErroredRecords;
+ this.totalSuccessfulRecords.set(totalSuccessfulRecords);
+ LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
+ instantTime, totalRecords, totalErroredRecords,
totalSuccessfulRecords);
+ if (totalRecords == 0) {
+ LOG.info("No new data, perform empty commit.");
+ }
+ boolean hasErrorRecords = totalErroredRecords > 0;
+ if (!hasErrorRecords || commitOnErrors) {
+ if (hasErrorRecords) {
+ LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
+ + totalErroredRecords + "/" + totalRecords);
+ }
+ }
+
+ if (errorTableWriter.isPresent()) {
+ boolean errorTableSuccess = true;
+ // Commit the error events triggered so far to the error table
+ if (isErrorTableWriteUnificationEnabled &&
errorTableWriteStatusRDDOpt.isPresent()) {
+ errorTableSuccess =
errorTableWriter.get().commit(errorTableInstantTime,
errorTableWriteStatusRDDOpt.get());
+ } else if (!isErrorTableWriteUnificationEnabled) {
+ errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, latestCommittedInstant);
+ }
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ LOG.info("Commit " + instantTime + " failed!");
+ writeClient.rollback(instantTime);
+ throw new HoodieStreamerWriteException("Error table commit
failed");
+ case LOG_ERROR:
+ LOG.error("Error Table write failed for instant " + instantTime);
+ break;
+ default:
+ throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
+ }
+ }
+ }
+ boolean canProceed = !hasErrorRecords || commitOnErrors;
+ if (canProceed) {
+ return canProceed;
+ } else {
+ LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErroredRecords + "/" + totalRecords);
+ LOG.error("Printing out the top 100 errors");
+
+ List<WriteStatus> erroredWriteStatueses =
leanWriteStatuses.filter(WriteStatus::hasErrors).collectAsList();
Review Comment:
we should not be collecting this in driver as it may OOM.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]