lokeshj1703 commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2112351659
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,35 @@
*/
public interface HoodieTableMetadataWriter<I,O> extends Serializable,
AutoCloseable {
+ /**
+ * Starts a new commit in metadata table for optimized write flow.
+ * @param instantTime
+ */
+ void startCommit(String instantTime);
+
+ /**
+ * Prepare records and write to Metadata table for all eligible partitions
except FILES partition. This will be used in optimized writes,
+ * where in data table writes statuses are maintained as HoodieData and
based on that, we prepare records and write to Metadata table
+ * partitions (except FILES). Caution should be followed to ensure the
action is not triggered on the incoming HoodieData < WriteStatus >
+ * and for the writes to metadata table. Caller is expected to trigger
collect just once for both set of HoodieData < WriteStatus >.
+ * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data
table writes.
+ * @param instantTime instant time of interest.
+ * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata
table.
+ */
+ HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime);
+
+ /**
+ * This api will be used in streaming writes to metadata flow, where in a
write in data table is already written to all data table, all partitions in
Metadata table
+ * using {@code #streamWriteToAllPartitions} and the action is triggered for
all writes together. Post that, marker reconciliation of data table is executed
and then
+ * we call {@code #writeToFilesPartitionAndCommit} where in, we write to
FILES partition in metadata table and complete the commit. This will also take
care of
+ * executing marker reconciliation in metadata table for all metadata table
partitions.
+ * @param instantTime instant time of interest.
+ * @param context {@link HoodieEngineContext} of interest.
+ * @param metadataWriteStatsSoFar List<HoodieWriteStat> for
partial/streaming writes to metadata table completed so far.
+ * @param commitMetadata {@link HoodieCommitMetadata} of interest.
+ */
+ void wrapUpStreamingWriteToMetadataTableAndCompleteCommit(String
instantTime, HoodieEngineContext context, List<HoodieWriteStat>
metadataWriteStatsSoFar, HoodieCommitMetadata commitMetadata);
Review Comment:
Addressed
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1770,5 +1923,13 @@ public boolean isInitialized() {
return writeClient;
}
+ public BaseHoodieWriteClient<?, I, ?, O> reInitWriteClient() {
Review Comment:
Removed it. It was not really required.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1298,9 +1451,7 @@ public void update(HoodieRollbackMetadata
rollbackMetadata, String instantTime)
// The commit which is being rolled back on the dataset
final String commitToRollbackInstantTime =
rollbackMetadata.getCommitsRollback().get(0);
// The deltacommit that will be rolled back
- HoodieInstant deltaCommitInstant =
metadataMetaClient.createNewInstant(HoodieInstant.State.COMPLETED,
- HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
- if
(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().containsInstant(deltaCommitInstant))
{
+ if
(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().containsInstant(commitToRollbackInstantTime))
{
Review Comment:
I think this is not required, we are already handling the pending commit
rollback case in else block.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1094,6 +1116,137 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that:
+ // 1. got committed to metadata table, but failed in datatable.
+ // 2. failed while committing to metadata table
+ // for e.g., let's say compaction c1 on 1st attempt succeeded in
metadata table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will be
created.
+ // once rollback is complete in datatable, compaction will be retried
again, which will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually rollback
the completed instant and proceed.
+ Option<HoodieInstant> existingInstant =
metadataMetaClient.getActiveTimeline().filter(entry ->
entry.requestedTime().equals(instantTime))
+ .lastInstant();
+ LOG.info("{} completed commit at {} being applied to MDT.",
+ existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+ // Rollback the previous commit
+ if (!writeClient.rollback(instantTime)) {
+ throw new HoodieMetadataException(String.format("Failed to rollback
deltacommit at %s from MDT", instantTime));
+ }
+ metadataMetaClient.reloadActiveTimeline();
+ reInitWriteClient();
+ }
+
+ getWriteClient().startCommitWithTime(instantTime,
HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ public void wrapUpStreamingWriteToMetadataTableAndCompleteCommit(String
instantTime, HoodieEngineContext context, List<HoodieWriteStat>
metadataWriteStatsSoFar, HoodieCommitMetadata metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ allWriteStats.addAll(prepareAndWriteToFILESPartition(context, metadata,
instantTime).map(writeStatus -> writeStatus.getStat()).collectAsList());
+ // finally committing to MDT
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ // Generate HoodieRecords for MDT partitions which can be generated just
by using one WriteStatus
+
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ HoodieData<Pair<String, HoodieRecord>> perWriteStatusRecords =
writeStatus.flatMap(
+ new
MetadataIndexGenerator.PerWriteStatsBasedIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // Generate HoodieRecords for MDT partitions which need per hudi partition
writeStats in one spark task
+ // for eg, partition stats index
+ HoodieData<Pair<String, HoodieRecord>> perPartitionRecords =
metadataIndexGenerator.get().prepareMDTRecordsGroupedByHudiPartition(writeStatus);
+
+ HoodieData<Pair<String, HoodieRecord>> mdtRecords =
perWriteStatusRecords.union(perPartitionRecords);
+
+ // tag records
+ Pair<List<Pair<String, String>>, HoodieData<HoodieRecord>>
taggedMdtRecords = tagRecordsWithLocation(mdtRecords,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+ // todo fix parallelism. Do we really need this. Upsert partitioner will
do this anyways.
+
+ // write partial writes to mdt table (every partition except FILES)
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedMdtRecords,
instantTime, true));
+ // dag not yet de-referenced. do not invoke any action on
mdtWriteStatusHoodieData yet.
+ return mdtWriteStatusHoodieData;
+ }
+
+ private HoodieData<WriteStatus>
prepareAndWriteToFILESPartition(HoodieEngineContext context,
HoodieCommitMetadata commitMetadata, String instantTime) {
+ HoodieData<HoodieRecord> mdtRecords =
metadataIndexGenerator.get().prepareFilesPartitionRecords(context,
commitMetadata, instantTime);
+ // write to mdt table
+ Pair<List<Pair<String, String>>, HoodieData<HoodieRecord>> taggedRecords =
tagRecordsWithLocation(mdtRecords.map(record ->
Pair.of(FILES.getPartitionPath(), record)),
+ Collections.singleton(FILES.getPartitionPath()));
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords,
instantTime, false));
+ return mdtWriteStatusHoodieData;
+ }
+
+ protected O streamWriteToMetadataTable(Pair<List<Pair<String, String>>,
HoodieData<HoodieRecord>> taggedMdtRecords, String instantTime, boolean
initialCall) {
+ throw new HoodieMetadataException("Should be implemented by engines");
+ }
+
+ /**
+ * Returns Pair of List of mdt fileIds involved in the
+ * @param untaggedMdtRecords
+ * @return
+ */
+ protected Pair<List<Pair<String, String>>, HoodieData<HoodieRecord>>
tagRecordsWithLocation(HoodieData<Pair<String, HoodieRecord>>
untaggedMdtRecords,
+
Set<String> enabledMetadataPartitions) {
+ // The result set
+ HoodieData<HoodieRecord> allPartitionRecords =
engineContext.emptyHoodieData();
+ List<Pair<String, String>> mdtPartitionFileIdPairs = new ArrayList<>();
+ try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+
+ // Fetch latest file slices for all enabled MDT partitions
+ Map<String, List<FileSlice>> mdtPartitionLatestFileSlices = new
HashMap<>();
+ enabledMetadataPartitions.forEach(partitionName -> {
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ if (fileSlices.isEmpty()) {
+ // scheduling of INDEX only initializes the file group and not add
commit
+ // so if there are no committed file slices, look for inflight slices
+ fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ }
+ mdtPartitionLatestFileSlices.put(partitionName, fileSlices);
+ final int fileGroupCount = fileSlices.size();
+ fileSlices.stream().forEach(fileSlice -> {
+ mdtPartitionFileIdPairs.add(Pair.of(partitionName,
fileSlice.getFileId()));
+ });
+ ValidationUtils.checkArgument(fileGroupCount > 0,
String.format("FileGroup count for MDT partition %s should be > 0",
partitionName));
+ });
+
+ HoodieData<HoodieRecord> rddSinglePartitionRecords =
untaggedMdtRecords.map(mdtPartitionRecordPair -> {
+ String mdtPartition = mdtPartitionRecordPair.getKey();
+ HoodieRecord r = mdtPartitionRecordPair.getValue();
+ List<FileSlice> latestFileSlices =
mdtPartitionLatestFileSlices.get(mdtPartition);
+ FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
+ latestFileSlices.size()));
+ r.unseal();
+ r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ r.seal();
+ return r;
+ });
+
+ allPartitionRecords =
allPartitionRecords.union(rddSinglePartitionRecords);
+ }
+
+ return Pair.of(mdtPartitionFileIdPairs, allPartitionRecords);
+ }
+
+ protected HoodieData<HoodieRecord>
repartitionByMDTFileSlice(HoodieData<HoodieRecord> records, int numPartitions) {
Review Comment:
Removed the API, was not getting used right now
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1094,6 +1116,137 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that:
+ // 1. got committed to metadata table, but failed in datatable.
+ // 2. failed while committing to metadata table
+ // for e.g., let's say compaction c1 on 1st attempt succeeded in
metadata table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will be
created.
+ // once rollback is complete in datatable, compaction will be retried
again, which will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually rollback
the completed instant and proceed.
+ Option<HoodieInstant> existingInstant =
metadataMetaClient.getActiveTimeline().filter(entry ->
entry.requestedTime().equals(instantTime))
+ .lastInstant();
+ LOG.info("{} completed commit at {} being applied to MDT.",
+ existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+ // Rollback the previous commit
+ if (!writeClient.rollback(instantTime)) {
+ throw new HoodieMetadataException(String.format("Failed to rollback
deltacommit at %s from MDT", instantTime));
+ }
+ metadataMetaClient.reloadActiveTimeline();
+ reInitWriteClient();
+ }
+
+ getWriteClient().startCommitWithTime(instantTime,
HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ public void wrapUpStreamingWriteToMetadataTableAndCompleteCommit(String
instantTime, HoodieEngineContext context, List<HoodieWriteStat>
metadataWriteStatsSoFar, HoodieCommitMetadata metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ allWriteStats.addAll(prepareAndWriteToFILESPartition(context, metadata,
instantTime).map(writeStatus -> writeStatus.getStat()).collectAsList());
+ // finally committing to MDT
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ // Generate HoodieRecords for MDT partitions which can be generated just
by using one WriteStatus
+
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ HoodieData<Pair<String, HoodieRecord>> perWriteStatusRecords =
writeStatus.flatMap(
+ new
MetadataIndexGenerator.PerWriteStatsBasedIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // Generate HoodieRecords for MDT partitions which need per hudi partition
writeStats in one spark task
+ // for eg, partition stats index
+ HoodieData<Pair<String, HoodieRecord>> perPartitionRecords =
metadataIndexGenerator.get().prepareMDTRecordsGroupedByHudiPartition(writeStatus);
+
+ HoodieData<Pair<String, HoodieRecord>> mdtRecords =
perWriteStatusRecords.union(perPartitionRecords);
+
+ // tag records
+ Pair<List<Pair<String, String>>, HoodieData<HoodieRecord>>
taggedMdtRecords = tagRecordsWithLocation(mdtRecords,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+ // todo fix parallelism. Do we really need this. Upsert partitioner will
do this anyways.
+
+ // write partial writes to mdt table (every partition except FILES)
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedMdtRecords,
instantTime, true));
+ // dag not yet de-referenced. do not invoke any action on
mdtWriteStatusHoodieData yet.
+ return mdtWriteStatusHoodieData;
+ }
+
+ private HoodieData<WriteStatus>
prepareAndWriteToFILESPartition(HoodieEngineContext context,
HoodieCommitMetadata commitMetadata, String instantTime) {
+ HoodieData<HoodieRecord> mdtRecords =
metadataIndexGenerator.get().prepareFilesPartitionRecords(context,
commitMetadata, instantTime);
+ // write to mdt table
+ Pair<List<Pair<String, String>>, HoodieData<HoodieRecord>> taggedRecords =
tagRecordsWithLocation(mdtRecords.map(record ->
Pair.of(FILES.getPartitionPath(), record)),
+ Collections.singleton(FILES.getPartitionPath()));
+ HoodieData<WriteStatus> mdtWriteStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords,
instantTime, false));
+ return mdtWriteStatusHoodieData;
+ }
+
+ protected O streamWriteToMetadataTable(Pair<List<Pair<String, String>>,
HoodieData<HoodieRecord>> taggedMdtRecords, String instantTime, boolean
initialCall) {
+ throw new HoodieMetadataException("Should be implemented by engines");
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]