the-other-tim-brown commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2125364788
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
return TimelineUtils.generateInstantTime(false, timeGenerator);
}
+ @Override
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusHandlerCallback>
writeStatusHandlerCallbackOpt) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
+ // Triggering the dag for writes to metadata table.
+ // When streaming writes are enabled, writes to metadata may not call this
method as the caller tightly controls the dag de-referencing.
+ // Even then, to initialize a new partition in Metadata table and for non
incremental operations like insert_overwrite, etc, writes to metadata table
+ // will invoke this commit method.
+ List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus ->
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
+ writeStatusesList.forEach(entry -> {
+ totalRecords.getAndAdd(entry.getTotalRecords());
+ totalErrorRecords.getAndAdd(entry.getTotalErrorRecords());
+ });
+
+ // reason why we are passing RDD<WriteStatus> to the writeStatusHandler
callback: We can't afford to collect all write status to dirver if there are
errors, since write status will hold
+ // every error record. So, just incase if there are errors, caller might
be interested to fetch error records. And so, we are passing the
RDD<WriteStatus> as last argument to the write status
+ // handler callback.
+ boolean canProceed = writeStatusHandlerCallbackOpt.isEmpty() ||
writeStatusHandlerCallbackOpt.get().processWriteStatuses(totalRecords.get(),
totalErrorRecords.get(),
Review Comment:
nitpick: use `map(..).orElse(false)`?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -114,6 +119,10 @@ public class HoodieWriteStat extends HoodieReadStats {
@Nullable
private RuntimeStats runtimeStats;
+ @JsonIgnore
Review Comment:
The annotation was not on the
[HoodieDeltaWriteStat.java](https://github.com/apache/hudi/pull/13307/files#diff-d4fa29e0b797c44385dccd4fe35b8f36f120a786c522837a4a3305a8c5001774)
Will this have side-effects?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java:
##########
@@ -63,6 +75,41 @@ public String createNewInstantTime() {
return TimelineUtils.generateInstantTime(false, timeGenerator);
}
+ @Override
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusHandlerCallback>
writeStatusHandlerCallbackOpt) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
+ // Triggering the dag for writes to metadata table.
+ // When streaming writes are enabled, writes to metadata may not call this
method as the caller tightly controls the dag de-referencing.
+ // Even then, to initialize a new partition in Metadata table and for non
incremental operations like insert_overwrite, etc, writes to metadata table
+ // will invoke this commit method.
+ List<WriteStatus> writeStatusesList = writeStatuses.map(writeStatus ->
writeStatus.removeMetadataIndexStatsAndErrorRecordsTracking()).collect();
+ // Compute stats for the writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
Review Comment:
nitpick: since we're not updating these values from multiple threads, the
atomicLong seems like it is not required.
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -344,4 +350,47 @@ public static JavaRDD<HoodieRecord>
resolveDuplicates(JavaSparkContext jssc,
return handleDuplicates(
new HoodieSparkEngineContext(jssc), incomingHoodieRecords,
writeConfig, failOnDuplicates);
}
+
+ /**
+ * Callback for WriteStatus Handler.
+ * If there are error records, we print few of them and exit.
+ * If not, we proceed with the commit.
+ */
+ static class SparkDataSourceWriteStatusHandlerCallback implements
WriteStatusHandlerCallback {
+
+ private final WriteOperationType writeOperationType;
+ private final AtomicBoolean hasErrored;
+
+ public SparkDataSourceWriteStatusHandlerCallback(WriteOperationType
writeOperationType, AtomicBoolean hasErrored) {
+ this.writeOperationType = writeOperationType;
+ this.hasErrored = hasErrored;
+ }
+
+ @Override
+ public boolean processWriteStatuses(long totalRecords, long
totalErroredRecords, Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+ if (totalErroredRecords > 0) {
+ hasErrored.set(true);
+ ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD
<WriteStatus> expected to be present when there are errors");
+ LOG.error("%s failed with errors", writeOperationType);
Review Comment:
`%s` should be `{}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]