lokeshj1703 commented on code in PR #13307:
URL: https://github.com/apache/hudi/pull/13307#discussion_r2114526383
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -75,23 +84,67 @@ public SparkRDDWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeC
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService,
SparkUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new SparkRDDTableServiceClient<T>(context,
writeConfig, getTimelineServer());
+ isMetadataTable =
HoodieTableMetadata.isMetadataTable(config.getBasePath());
}
@Override
protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, new
NoOpWriteStatusHandlerCallback());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ WriteStatusHandlerCallback writeStatusHandlerCallback)
{
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ // If streaming writes are enabled, writes to both data table and metadata
table gets triggers at this juncture.
+ // If not, writes to data table gets triggered here.
+ // When streaming writes are enabled, WriteStatus is expected to contain
all stats required to generate metadata table records and so it could be fatter.
Review Comment:
Can not think of a simpler name. Maybe simpleWriteStatuses?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]