nsivabalan commented on code in PR #13236:
URL: https://github.com/apache/hudi/pull/13236#discussion_r2071292691
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -317,30 +350,74 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
table.getMetaClient().reloadActiveTimeline();
}
compactionTimer = metrics.getCompactionCtx();
+ // initialize streaming writer to MDT if enabled
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
mayBeInitStreamingWriterToMetadataTable(table.getMetaClient(),
compactionInstantTime);
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionMetadata =
convertToOutputMetadata(writeMetadata);
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeCompaction(compactionMetadata.getCommitMetadata().get(), table,
compactionInstantTime);
+ HoodieWriteMetadata<T> processedWriteMetadata =
mayBeStreamWriteToMetadataTable(writeMetadata, compactionInstantTime,
metadataWriterOpt);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table), metadataWriterOpt);
}
- return compactionMetadata;
+ return compactionWriteMetadata;
}
/**
- * Commit a compaction operation. Allow passing additional meta-data to be
stored in commit instant file.
- *
- * @param compactionInstantTime Compaction Instant Time
- * @param metadata All the metadata that gets stored along with
a commit
- * @param extraMetadata Extra Metadata to be stored
+ * If streaming writes to metadata table is enabled, perform streaming
writes to metadata table and return HoodieWriteMetadata containing the {@link
WriteStatus}
+ * pertianing to partial metadata writes.
+ * @param writeMetadata {@link HoodieWriteMetadata} instance based on writes
to data table.
+ * @param instantTime instant time of interest.
+ * @param metadataWriterOpt Optional instance of {@link
HoodieTableMetadataWriter} to assist with streaming writes.
+ * @return instance of {@link HoodieWriteMetadata} contain {@link
WriteStatus} for data table and optionally for metadata table as well.
*/
- public void commitCompaction(String compactionInstantTime,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
- extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
- completeCompaction(metadata, createTable(config,
context.getStorageConf()), compactionInstantTime);
+ protected HoodieWriteMetadata<T>
mayBeStreamWriteToMetadataTable(HoodieWriteMetadata<T> writeMetadata, String
instantTime,
+
Option<HoodieTableMetadataWriter> metadataWriterOpt) {
+ if (writeMetadata.getAllWriteStatuses() == null) {
+
writeMetadata.setAllWriteStatuses(writeMetadata.getDataTableWriteStatuses());
+ }
+ return writeMetadata;
+ }
+
+ /**
+ * Triggers the writes to data table and optionally metadata table (if
streaming writes are enabled) and return a pair of List of HoodieWriteStat for
data table
+ * and another List of HoodieWriteStat for metadata table.
+ * @param writeMetadata instance of {@link HoodieWriteMetadata} of interest
based on the table service triggered in data table.
+ * @return a Pair of List of HoodieWriteStat for data table and another List
of HoodieWriteStat for metadata table.
+ */
+ protected abstract Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> writeMetadata);
+
+ public void commitCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt,
Review Comment:
commitCompaction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]