nsivabalan commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2112186058
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -1339,4 +1298,112 @@ public JavaRDD<WriteStatus> getWriteStatusRDD() {
return writeStatusRDD;
}
}
+
+ static class HoodieStreamerWriteStatusHandlerCallback implements
WriteStatusHandlerCallback {
+
+ private final boolean commitOnErrors;
+ private final String instantTime;
+ private final HoodieStreamer.Config cfg;
+ private final Option<BaseErrorTableWriter> errorTableWriter;
+ private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+ private final HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+ private final boolean isErrorTableWriteUnificationEnabled;
+ private final String errorTableInstantTime;
+ private final SparkRDDWriteClient writeClient;
+ private final Option<String> latestCommittedInstant;
+ private final AtomicLong totalSuccessfulRecords;
+
+ HoodieStreamerWriteStatusHandlerCallback(boolean commitOnErrors,
+ String instantTime,
+ HoodieStreamer.Config cfg,
+ Option<BaseErrorTableWriter>
errorTableWriter,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+ boolean
isErrorTableWriteUnificationEnabled,
+ String errorTableInstantTime,
+ SparkRDDWriteClient writeClient,
+ Option<String>
latestCommittedInstant,
+ AtomicLong
totalSuccessfulRecords) {
+ this.commitOnErrors = commitOnErrors;
+ this.instantTime = instantTime;
+ this.cfg = cfg;
+ this.errorTableWriter = errorTableWriter;
+ this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+ this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+ this.isErrorTableWriteUnificationEnabled =
isErrorTableWriteUnificationEnabled;
+ this.errorTableInstantTime = errorTableInstantTime;
+ this.writeClient = writeClient;
+ this.latestCommittedInstant = latestCommittedInstant;
+ this.totalSuccessfulRecords = totalSuccessfulRecords;
+ }
+
+ @Override
+ public boolean processWriteStatuses(long tableTotalRecords, long
tableTotalErroredRecords, HoodieData<WriteStatus> leanWriteStatuses) {
+
+ long totalRecords = tableTotalRecords;
+ long totalErroredRecords = tableTotalErroredRecords;
+ // TODO: Remove flag isErrorTableWriteUnificationEnabled, should not be
required anymore
Review Comment:
can we remove the TODOs here.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService,
SparkUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new SparkRDDTableServiceClient<T>(context,
writeConfig, getTimelineServer());
+ isMetadataTable =
HoodieTableMetadata.isMetadataTable(config.getBasePath());
}
@Override
protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, new
NoOpWriteStatusHandlerCallback());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ WriteStatusHandlerCallback writeStatusHandlerCallback)
{
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggers at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, WriteStatus is expected to contain
all stats required to generate metadata table records and so it could be fatter.
Review Comment:
fix the documentation. we don't have any class named LeanWriteStatus.
also, consider renaming the var in L115.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService,
SparkUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new SparkRDDTableServiceClient<T>(context,
writeConfig, getTimelineServer());
+ isMetadataTable =
HoodieTableMetadata.isMetadataTable(config.getBasePath());
}
@Override
protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, new
NoOpWriteStatusHandlerCallback());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ WriteStatusHandlerCallback writeStatusHandlerCallback)
{
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggers at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, WriteStatus is expected to contain
all stats required to generate metadata table records and so it could be fatter.
Review Comment:
and can you add a java doc on whats expected when streaming writes are not
enabled.
Is there a simpler processing we can do when streaming writes are disabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]