This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9aabd9b25 [HUDI-5434] Fix archival in metadata table to not rely on
completed rollback or clean in data table (#7580)
7a9aabd9b25 is described below
commit 7a9aabd9b2595158a2610be12d567df4d974e258
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 11 10:47:59 2023 -0800
[HUDI-5434] Fix archival in metadata table to not rely on completed
rollback or clean in data table (#7580)
Before this change, the archival for the metadata table uses the earliest
instant of all actions from the active timeline of the data table. In the
archival process, CLEAN and ROLLBACK instants are archived separately apart
from commits (check HoodieTimelineArchiver#getCleanInstantsToArchive). Because
of this, a very old completed CLEAN or ROLLBACK instant in the data table can
block the archive of the metadata table timeline and causes the active timeline
of the metadata table to be [...]
This commit changes the archival in metadata table to not rely on completed
rollback or clean in data table, by archiving the metadata table's instants
after the earliest commit (COMMIT, DELTA_COMMIT, and REPLACE_COMMIT only,
considering non-savepoint commit only if enabling archive beyond savepoint) and
the earliest inflight instant (all actions) in the data table's active timeline.
---
.../apache/hudi/client/HoodieTimelineArchiver.java | 44 ++++++------
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 18 +++--
.../hudi/common/table/timeline/TimelineUtils.java | 69 +++++++++++++++---
.../hudi/common/table/TestTimelineUtils.java | 84 +++++++++++++++++++++-
4 files changed, 177 insertions(+), 38 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index f4937de943e..629b8115fcd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -28,13 +28,13 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -46,6 +46,7 @@ import
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -514,24 +515,27 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
- Option<HoodieInstant> earliestActiveDatasetCommit =
dataMetaClient.getActiveTimeline().firstInstant();
-
- if (config.shouldArchiveBeyondSavepoint()) {
- // There are chances that there could be holes in the timeline due to
archival and savepoint interplay.
- // So, the first non-savepoint commit in the data timeline is
considered as beginning of the active timeline.
- Option<HoodieInstant> firstNonSavepointCommit =
dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
- if (firstNonSavepointCommit.isPresent()) {
- String firstNonSavepointCommitTime =
firstNonSavepointCommit.get().getTimestamp();
- instants = instants.filter(instant ->
- compareTimestamps(instant.getTimestamp(), LESSER_THAN,
firstNonSavepointCommitTime));
- }
- } else {
- // Do not archive the commits that live in data set active timeline.
- // This is required by metadata table, see
HoodieTableMetadataUtil#processRollbackMetadata for details.
- if (earliestActiveDatasetCommit.isPresent()) {
- instants = instants.filter(instant ->
- compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
- }
+ Option<HoodieInstant> qualifiedEarliestInstant =
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ dataMetaClient.getActiveTimeline(),
config.shouldArchiveBeyondSavepoint());
+
+ // Do not archive the instants after the earliest commit (COMMIT,
DELTA_COMMIT, and
+ // REPLACE_COMMIT only, considering non-savepoint commit only if
enabling archive
+ // beyond savepoint) and the earliest inflight instant (all actions).
+ // This is required by metadata table, see
HoodieTableMetadataUtil#processRollbackMetadata
+ // for details.
+ // Note that we cannot blindly use the earliest instant of all actions,
because CLEAN and
+ // ROLLBACK instants are archived separately apart from commits (check
+ // HoodieTimelineArchiver#getCleanInstantsToArchive). If we do so, a
very old completed
+ // CLEAN or ROLLBACK instant can block the archive of metadata table
timeline and causes
+ // the active timeline of metadata table to be extremely long, leading
to performance issues
+ // for loading the timeline.
+ if (qualifiedEarliestInstant.isPresent()) {
+ instants = instants.filter(instant ->
+ compareTimestamps(
+ instant.getTimestamp(),
+ HoodieTimeline.LESSER_THAN,
+ qualifiedEarliestInstant.get().getTimestamp()));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 72aba1f1638..4d7271364f4 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -1301,8 +1301,15 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
.setLoadActiveTimelineOnLoad(true).build();
for (int i = 1; i <= 17; i++) {
- testTable.doWriteOperation("000000" + String.format("%02d", i),
WriteOperationType.UPSERT,
- i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ if (i != 2) {
+ testTable.doWriteOperation("000000" + String.format("%02d", i),
WriteOperationType.UPSERT,
+ i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
+ } else {
+ // For i == 2, roll back the first commit "00000001", so the active
timeline of the
+ // data table has one rollback instant
+ // The completed rollback should not block the archival in the
metadata table
+ testTable.doRollback("00000001", "00000002");
+ }
// archival
archiveAndGetCommitsList(writeConfig);
@@ -1323,10 +1330,9 @@ public class TestHoodieTimelineArchiver extends
HoodieClientTestHarness {
} else if (i == 8) {
// i == 8
// The instant "00000000000000" was archived since it's less than
- // the earliest instant on the dataset active timeline,
- // the dataset active timeline has instants of range [00000001 ~
00000008]
- // because when it does the archiving, no compaction instant on the
- // metadata active timeline exists yet.
+ // the earliest commit on the dataset active timeline,
+ // the dataset active timeline has instants:
+ // 00000002.rollback, 00000007.commit, 00000008.commit
assertEquals(9, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"00000007001")));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 1f9d416b2b5..368047a7877 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -39,6 +40,11 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
+
/**
* TimelineUtils provides a common way to query incremental meta-data changes
for a hoodie table.
*
@@ -87,15 +93,15 @@ public class TimelineUtils {
public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
return timeline.filterCompletedInstants().getInstantsAsStream().flatMap(s
-> {
switch (s.getAction()) {
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.DELTA_COMMIT_ACTION:
+ case COMMIT_ACTION:
+ case DELTA_COMMIT_ACTION:
try {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions written at "
+ s, e);
- }
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ }
+ case REPLACE_COMMIT_ACTION:
try {
HoodieReplaceCommitMetadata commitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
timeline.getInstantDetails(s).get(),
HoodieReplaceCommitMetadata.class);
@@ -148,11 +154,11 @@ public class TimelineUtils {
* Get extra metadata for specified key from latest
commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
*/
public static Option<String>
getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String
extraMetadataKey) {
- return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
+ return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
// exclude clustering commits for returning user stored extra metadata
.filter(instant -> !isClusteringCommit(metaClient, instant))
.findFirst().map(instant ->
- getMetadataValue(metaClient, extraMetadataKey,
instant)).orElse(Option.empty());
+ getMetadataValue(metaClient, extraMetadataKey,
instant)).orElse(Option.empty());
}
/**
@@ -170,7 +176,7 @@ public class TimelineUtils {
*/
public static Map<String, Option<String>>
getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String
extraMetadataKey) {
return
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap(
- HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient,
extraMetadataKey, instant)));
+ HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient,
extraMetadataKey, instant)));
}
private static Option<String> getMetadataValue(HoodieTableMetaClient
metaClient, String extraMetadataKey, HoodieInstant instant) {
@@ -184,10 +190,10 @@ public class TimelineUtils {
throw new HoodieIOException("Unable to parse instant metadata " +
instant, e);
}
}
-
+
public static boolean isClusteringCommit(HoodieTableMetaClient metaClient,
HoodieInstant instant) {
try {
- if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+ if (REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// replacecommit is used for multiple operations:
insert_overwrite/cluster etc.
// Check operation type to see if this instant is related to
clustering.
HoodieReplaceCommitMetadata replaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(
@@ -240,10 +246,53 @@ public class TimelineUtils {
HoodieInstant instant,
HoodieTimeline timeline) throws IOException {
byte[] data = timeline.getInstantDetails(instant).get();
- if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ if (instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
return HoodieReplaceCommitMetadata.fromBytes(data,
HoodieReplaceCommitMetadata.class);
} else {
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
}
}
+
+ /**
+ * Gets the qualified earliest instant from the active timeline of the data
table
+ * for the archival in metadata table.
+ * <p>
+ * the qualified earliest instant is chosen as the earlier one between the
earliest
+ * commit (COMMIT, DELTA_COMMIT, and REPLACE_COMMIT only, considering
non-savepoint
+ * commit only if enabling archive beyond savepoint) and the earliest
inflight
+ * instant (all actions).
+ *
+ * @param dataTableActiveTimeline the active timeline of the data table.
+ * @param shouldArchiveBeyondSavepoint whether to archive beyond savepoint.
+ * @return the instant meeting the requirement.
+ */
+ public static Option<HoodieInstant> getEarliestInstantForMetadataArchival(
+ HoodieActiveTimeline dataTableActiveTimeline, boolean
shouldArchiveBeyondSavepoint) {
+ // This is for commits only, not including CLEAN, ROLLBACK, etc.
+ // When archive beyond savepoint is enabled, there are chances that there
could be holes
+ // in the timeline due to archival and savepoint interplay. So, the first
non-savepoint
+ // commit in the data timeline is considered as beginning of the active
timeline.
+ Option<HoodieInstant> earliestCommit = shouldArchiveBeyondSavepoint
+ ? dataTableActiveTimeline.getTimelineOfActions(
+ CollectionUtils.createSet(
+ COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION,
SAVEPOINT_ACTION))
+ .getFirstNonSavepointCommit()
+ : dataTableActiveTimeline.getCommitsTimeline().firstInstant();
+ // This is for all instants which are in-flight
+ Option<HoodieInstant> earliestInflight =
+ dataTableActiveTimeline.filterInflightsAndRequested().firstInstant();
+
+ if (earliestCommit.isPresent() && earliestInflight.isPresent()) {
+ if (earliestCommit.get().compareTo(earliestInflight.get()) < 0) {
+ return earliestCommit;
+ }
+ return earliestInflight;
+ } else if (earliestCommit.isPresent()) {
+ return earliestCommit;
+ } else if (earliestInflight.isPresent()) {
+ return earliestInflight;
+ } else {
+ return Option.empty();
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 5e91118b269..0c09d91163e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -56,8 +56,13 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -136,7 +141,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant,
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
- HoodieInstant cleanInstant = new HoodieInstant(true,
HoodieTimeline.CLEAN_ACTION, ts);
+ HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(olderPartition, ts));
}
@@ -175,7 +180,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant,
Option.of(getCommitMetadata(basePath, partitionPath, ts, 2,
Collections.emptyMap())));
- HoodieInstant cleanInstant = new HoodieInstant(true,
HoodieTimeline.CLEAN_ACTION, ts);
+ HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(partitionPath, ts));
}
@@ -339,6 +344,81 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
);
}
+ @Test
+ public void testGetEarliestInstantForMetadataArchival() throws IOException {
+ // Empty timeline
+ assertEquals(
+ Option.empty(),
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ prepareActiveTimeline(new ArrayList<>()), false));
+
+ // Earlier request clean action before commits
+ assertEquals(
+ Option.of(new HoodieInstant(REQUESTED, CLEAN_ACTION, "003")),
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ prepareActiveTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
+ new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"),
+ new HoodieInstant(REQUESTED, CLEAN_ACTION, "003"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION,
"011"))), false));
+
+ // No inflight instants
+ assertEquals(
+ Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")),
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ prepareActiveTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
+ new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"),
+ new HoodieInstant(COMPLETED, CLEAN_ACTION, "003"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION,
"011"))), false));
+
+ // Rollbacks only
+ assertEquals(
+ Option.of(new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003")),
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ prepareActiveTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"),
+ new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003"))),
false));
+
+ assertEquals(
+ Option.empty(),
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ prepareActiveTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003"))),
false));
+
+ // With savepoints
+ HoodieActiveTimeline timeline = prepareActiveTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "003"),
+ new HoodieInstant(COMPLETED, SAVEPOINT_ACTION, "003"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011")));
+ assertEquals(
+ Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "003")),
+ TimelineUtils.getEarliestInstantForMetadataArchival(timeline, false));
+ assertEquals(
+ Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")),
+ TimelineUtils.getEarliestInstantForMetadataArchival(timeline, true));
+ }
+
+ private HoodieActiveTimeline prepareActiveTimeline(
+ List<HoodieInstant> activeInstants) throws IOException {
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
+ .thenReturn(activeInstants);
+ return new HoodieActiveTimeline(mockMetaClient);
+ }
+
private void verifyExtraMetadataLatestValue(String extraMetadataKey, String
expected, boolean includeClustering) {
final Option<String> extraLatestValue;
if (includeClustering) {