This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 271a71b648e [HUDI-6560] Avoid to read instant details 2 times for
archiving (#9227)
271a71b648e is described below
commit 271a71b648e06c603e83cb454c943907129696a9
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jul 19 19:05:33 2023 +0800
[HUDI-6560] Avoid to read instant details 2 times for archiving (#9227)
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 12 +----
.../hudi/client/utils/MetadataConversionUtils.java | 51 ++++++++++++----------
.../org/apache/hudi/common/util/CleanerUtils.java | 26 +++++++++++
.../apache/hudi/common/util/CompactionUtils.java | 10 ++++-
4 files changed, 62 insertions(+), 37 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index b1a82c1ed03..3d41e3011bd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -686,13 +686,7 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
for (HoodieInstant hoodieInstant : instants) {
try {
deleteAnyLeftOverMarkers(context, hoodieInstant);
- // in local FS and HDFS, there could be empty completed instants due
to crash.
- if (table.getActiveTimeline().isEmpty(hoodieInstant) &&
hoodieInstant.isCompleted()) {
- // lets add an entry to the archival, even if not for the plan.
- records.add(createAvroRecordFromEmptyInstant(hoodieInstant));
- } else {
- records.add(convertToAvroRecord(hoodieInstant));
- }
+ records.add(convertToAvroRecord(hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
@@ -732,8 +726,4 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
throws IOException {
return MetadataConversionUtils.createMetaWrapper(hoodieInstant,
metaClient);
}
-
- private IndexedRecord createAvroRecordFromEmptyInstant(HoodieInstant
hoodieInstant) throws IOException {
- return
MetadataConversionUtils.createMetaWrapperForEmptyInstant(hoodieInstant);
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index e068d4b0432..cfd47ab2b37 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -47,6 +47,12 @@ import org.apache.hudi.common.util.Option;
public class MetadataConversionUtils {
public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+ Option<byte[]> instantDetails =
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
+ if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
+ // in local FS and HDFS, there could be empty completed instants due to
crash.
+ // let's add an entry to the archival, even if not for the plan.
+ return createMetaWrapperForEmptyInstant(hoodieInstant);
+ }
HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
@@ -54,44 +60,41 @@ public class MetadataConversionUtils {
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
hoodieInstant));
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
} else {
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
hoodieInstant));
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
instantDetails.get()));
}
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(instantDetails.get(),
HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
- HoodieCommitMetadata deltaCommitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
+ HoodieCommitMetadata deltaCommitMetadata =
HoodieCommitMetadata.fromBytes(instantDetails.get(),
HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
if (hoodieInstant.isCompleted()) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(),
HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
} else if (hoodieInstant.isInflight()) {
- // inflight replacecommit files have the same meta data body as
HoodieCommitMetadata
+ // inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
// so we could re-use it without further creating an inflight
extension.
// Or inflight replacecommit files are empty under clustering
circumstance
- Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightReplaceMetadata(metaClient, hoodieInstant);
+ Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightCommitMetadata(instantDetails);
if (inflightCommitMetadata.isPresent()) {
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
}
} else {
// we may have cases with empty HoodieRequestedReplaceMetadata e.g.
insert_overwrite_table or insert_overwrite
// without clustering. However, we should revisit the requested
commit file standardization
- Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(metaClient, hoodieInstant);
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(instantDetails);
if (requestedReplaceMetadata.isPresent()) {
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
}
@@ -101,27 +104,29 @@ public class MetadataConversionUtils {
}
case HoodieTimeline.ROLLBACK_ACTION: {
if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
-
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieRollbackMetadata.class));
+
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieRollbackMetadata.class));
}
archivedMetaWrapper.setActionType(ActionType.rollback.name());
break;
}
case HoodieTimeline.SAVEPOINT_ACTION: {
-
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
-
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieSavepointMetadata.class));
+
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieSavepointMetadata.class));
archivedMetaWrapper.setActionType(ActionType.savepoint.name());
break;
}
case HoodieTimeline.COMPACTION_ACTION: {
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
case HoodieTimeline.LOG_COMPACTION_ACTION: {
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, hoodieInstant.getTimestamp());
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
break;
}
@@ -132,7 +137,7 @@ public class MetadataConversionUtils {
return archivedMetaWrapper;
}
- public static HoodieArchivedMetaEntry
createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) throws
IOException {
+ public static HoodieArchivedMetaEntry
createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) {
HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
@@ -173,8 +178,7 @@ public class MetadataConversionUtils {
return archivedMetaWrapper;
}
- public static Option<HoodieCommitMetadata>
getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant
instant) throws IOException {
- Option<byte[]> inflightContent =
metaClient.getActiveTimeline().getInstantDetails(instant);
+ private static Option<HoodieCommitMetadata>
getInflightCommitMetadata(Option<byte[]> inflightContent) throws IOException {
if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
// inflight files can be empty in some certain cases, e.g. when users
opt in clustering
return Option.empty();
@@ -182,8 +186,7 @@ public class MetadataConversionUtils {
return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(),
HoodieCommitMetadata.class));
}
- private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant
instant) throws IOException {
- Option<byte[]> requestedContent =
metaClient.getActiveTimeline().getInstantDetails(instant);
+ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadata(Option<byte[]> requestedContent) throws IOException
{
if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
// requested commit files can be empty in some certain cases, e.g.
insert_overwrite or insert_overwrite_table.
// However, it appears requested files are supposed to contain meta data
and we should revisit the standardization
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 2f57f5455c3..899bd673665 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -110,6 +110,18 @@ public class CleanerUtils {
return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
}
+ /**
+ * Get Latest Version of Hoodie Cleaner Metadata - Output of cleaner
operation.
+ * @return Latest version of Clean metadata corresponding to clean instant
+ * @throws IOException
+ */
+ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient
metaClient, byte[] details)
+ throws IOException {
+ CleanMetadataMigrator metadataMigrator = new
CleanMetadataMigrator(metaClient);
+ HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(details);
+ return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
+ }
+
public static Option<HoodieInstant> getEarliestCommitToRetain(
HoodieTimeline commitsTimeline, HoodieCleaningPolicy cleaningPolicy, int
commitsRetained,
Instant latestInstant, int hoursRetained, HoodieTimelineTimeZone
timeZone) {
@@ -159,6 +171,20 @@ public class CleanerUtils {
return cleanPlanMigrator.upgradeToLatest(cleanerPlan,
cleanerPlan.getVersion());
}
+ /**
+ * Get Latest version of cleaner plan corresponding to a clean instant.
+ *
+ * @param metaClient Hoodie Table Meta Client
+ * @return Cleaner plan corresponding to clean instant
+ * @throws IOException
+ */
+ public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, byte[] details)
+ throws IOException {
+ CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
+ HoodieCleanerPlan cleanerPlan =
TimelineMetadataUtils.deserializeAvroMetadata(details, HoodieCleanerPlan.class);
+ return cleanPlanMigrator.upgradeToLatest(cleanerPlan,
cleanerPlan.getVersion());
+ }
+
/**
* Convert list of cleanFileInfo instances to list of avro-generated
HoodieCleanFileInfo instances.
* @param cleanFileInfoList
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 03fd0688665..0f41f1314e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -186,10 +186,16 @@ public class CompactionUtils {
* Util method to fetch both compaction and log compaction plan from
requestedInstant.
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, HoodieInstant requestedInstant) {
+ return getCompactionPlan(metaClient,
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant));
+ }
+
+ /**
+ * Util method to fetch both compaction and log compaction plan from
requestedInstant.
+ */
+ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, Option<byte[]> planContent) {
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
try {
- HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(
-
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant).get());
+ HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(planContent.get());
return migrator.upgradeToLatest(compactionPlan,
compactionPlan.getVersion());
} catch (IOException e) {
throw new HoodieException(e);