This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf5ec3503a6 [HUDI-7779] Guard archival on savepoint removal until
cleaner is able to clean it up (#11440)
bf5ec3503a6 is described below
commit bf5ec3503a6bfb0cc9b8352d290cee60fb3a84ac
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jul 19 16:09:25 2024 +0530
[HUDI-7779] Guard archival on savepoint removal until cleaner is able to
clean it up (#11440)
---
.../client/timeline/HoodieTimelineArchiver.java | 3 +-
.../action/clean/CleanPlanActionExecutor.java | 6 +-
.../hudi/table/action/clean/CleanPlanner.java | 31 ++-
.../apache/hudi/table/action/TestCleanPlanner.java | 169 ++++++++++----
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 249 +++++++++++++++++----
.../table/timeline/HoodieDefaultTimeline.java | 6 +
.../hudi/common/table/timeline/HoodieTimeline.java | 31 ++-
.../org/apache/hudi/common/util/CleanerUtils.java | 2 +-
.../apache/hudi/common/util/ClusteringUtils.java | 67 ++++--
.../table/timeline/TestHoodieActiveTimeline.java | 11 +
.../hudi/common/testutils/HoodieTestTable.java | 11 +-
.../hudi/common/util/TestClusteringUtils.java | 6 +-
12 files changed, 462 insertions(+), 130 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index 817c3f650d9..dd091386d26 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -208,7 +208,8 @@ public class HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
// Meanwhile, when inline or async clustering is enabled, we need to
ensure that there is a commit in the active timeline
// to check whether the file slice generated in pending clustering after
archive isn't committed.
Option<HoodieInstant> earliestInstantToRetainForClustering =
-
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient());
+
ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient(),
+ config.getCleanerPolicy());
earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);
// 4. If metadata table is enabled, do not archive instants which are more
recent than the last compaction on the
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 0329fc8ddc6..613ed979367 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
-import static
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
+import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
I, K, O, Option<HoodieCleanerPlan>> {
@@ -159,7 +159,9 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
if (savepointedTimestamps.isEmpty()) {
return Collections.emptyMap();
} else {
- return Collections.singletonMap(SAVEPOINTED_TIMESTAMPS,
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+ return extraMetadata;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index fff86ab8659..57546252cad 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -78,7 +78,6 @@ public class CleanPlanner<T, I, K, O> implements Serializable
{
public static final Integer CLEAN_PLAN_VERSION_1 =
CleanPlanV1MigrationHandler.VERSION;
public static final Integer CLEAN_PLAN_VERSION_2 =
CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
- public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
@@ -88,6 +87,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable
{
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;
private List<String> savepointedTimestamps;
+ private Option<HoodieInstant> earliestCommitToRetain = Option.empty();
public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O>
hoodieTable, HoodieWriteConfig config) {
this.context = context;
@@ -126,11 +126,6 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
return metadata.getPartitionMetadata().values().stream().flatMap(s ->
s.getSavepointDataFile().stream());
}
- private Stream<String> getPartitionsFromSavepoint(String savepointTime) {
- HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
- return metadata.getPartitionMetadata().keySet().stream();
- }
-
private HoodieSavepointMetadata getSavepointMetadata(String
savepointTimestamp) {
if (!hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
throw new HoodieSavepointException(
@@ -203,8 +198,9 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
*/
private List<String>
getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
- boolean isSavepointDeleted = isAnySavepointDeleted(cleanMetadata);
- if (isSavepointDeleted) {
+
+ boolean isAnySavepointDeleted = isAnySavepointDeleted(cleanMetadata);
+ if (isAnySavepointDeleted) {
LOG.info("Since savepoints have been removed compared to previous clean,
triggering clean planning for all partitions");
return getPartitionPathsForFullCleaning();
} else {
@@ -222,7 +218,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
private boolean isAnySavepointDeleted(HoodieCleanMetadata cleanMetadata) {
List<String> savepointedTimestampsFromLastClean =
cleanMetadata.getExtraMetadata() == null ? Collections.emptyList()
- :
Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault(SAVEPOINTED_TIMESTAMPS,
StringUtils.EMPTY_STRING).split(","))
+ :
Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault(CleanerUtils.SAVEPOINTED_TIMESTAMPS,
StringUtils.EMPTY_STRING).split(","))
.filter(partition ->
!StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
if (savepointedTimestampsFromLastClean.isEmpty()) {
return false;
@@ -553,13 +549,16 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
* Returns the earliest commit to retain based on cleaning policy.
*/
public Option<HoodieInstant> getEarliestCommitToRetain() {
- return CleanerUtils.getEarliestCommitToRetain(
-
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
- config.getCleanerPolicy(),
- config.getCleanerCommitsRetained(),
- Instant.now(),
- config.getCleanerHoursRetained(),
- hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
+ if (!earliestCommitToRetain.isPresent()) {
+ earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
+
hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(),
+ config.getCleanerPolicy(),
+ config.getCleanerCommitsRetained(),
+ Instant.now(),
+ config.getCleanerHoursRetained(),
+ hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
+ }
+ return earliestCommitToRetain;
}
/**
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index 179454e2063..c4ffa8c1bac 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -19,8 +19,10 @@
package org.apache.hudi.table.action;
+import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -40,6 +42,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -70,10 +73,12 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static
org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
-import static
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
+import static org.apache.hudi.common.util.CleanerUtils.SAVEPOINTED_TIMESTAMPS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -132,17 +137,23 @@ public class TestCleanPlanner {
when(mockFsView.getAllFileGroupsStateless(partitionPath)).thenReturn(allFileGroups.stream());
CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context,
mockHoodieTable, config);
- HoodieInstant earliestCommitToRetain = new
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+ HoodieInstant earliestCommitToRetain = new HoodieInstant(COMPLETED,
"COMMIT", earliestInstant);
Pair<Boolean, List<CleanFileInfo>> actual =
cleanPlanner.getDeletePaths(partitionPath, Option.of(earliestCommitToRetain));
assertEquals(expected, actual);
}
+ /**
+ * The test asserts clean planner results for APIs
org.apache.hudi.table.action.clean.CleanPlanner#getEarliestSavepoint()
+ * and
org.apache.hudi.table.action.clean.CleanPlanner#getPartitionPathsToClean(org.apache.hudi.common.util.Option)
+ * given the other input arguments for clean metadata. The idea is the two
API results should match the expected results.
+ */
@ParameterizedTest
@MethodSource("incrCleaningPartitionsTestCases")
void testPartitionsForIncrCleaning(boolean isPartitioned, HoodieWriteConfig
config, String earliestInstant,
String lastCompletedTimeInLastClean,
String lastCleanInstant, String earliestInstantsInLastClean, List<String>
partitionsInLastClean,
- Map<String, List<String>>
savepointsTrackedInLastClean, Map<String, List<String>>
activeInstantsPartitions,
- Map<String, List<String>> savepoints,
List<String> expectedPartitions, boolean areCommitsForSavepointsRemoved) throws
IOException {
+ Map<String, List<String>>
savepointsTrackedInLastClean, Option<String>
expectedEarliestSavepointInLastClean,
+ Map<String, List<String>>
activeInstantsPartitions, List<String> replaceCommits, List<String>
expectedPartitions, boolean areCommitsForSavepointsRemoved,
+ Map<String, List<String>> savepoints)
throws IOException, IllegalAccessException {
HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
// setup savepoint mocks
@@ -158,9 +169,10 @@ public class TestCleanPlanner {
// prepare last Clean Metadata
Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadataOptionPair =
- getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant,
earliestInstantsInLastClean, lastCompletedTimeInLastClean,
savepointsTrackedInLastClean.keySet());
- mockLastCleanCommit(mockHoodieTable, lastCleanInstant,
earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
- mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions,
savepointsTrackedInLastClean, areCommitsForSavepointsRemoved);
+ getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant,
earliestInstantsInLastClean, lastCompletedTimeInLastClean,
+ savepointsTrackedInLastClean.keySet(),
expectedEarliestSavepointInLastClean);
+ HoodieCleanerPlan cleanerPlan = mockLastCleanCommit(mockHoodieTable,
lastCleanInstant, earliestInstantsInLastClean, activeTimeline,
cleanMetadataOptionPair, savepointsTrackedInLastClean.keySet());
+ mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions,
savepointsTrackedInLastClean, areCommitsForSavepointsRemoved, replaceCommits);
// mock getAllPartitions
HoodieStorage storage = mock(HoodieStorage.class);
@@ -169,10 +181,14 @@ public class TestCleanPlanner {
when(mockHoodieTable.getMetadataTable()).thenReturn(hoodieTableMetadata);
when(hoodieTableMetadata.getAllPartitionPaths()).thenReturn(isPartitioned
? Arrays.asList(PARTITION1, PARTITION2, PARTITION3) :
Collections.singletonList(StringUtils.EMPTY_STRING));
- // Trigger clean and validate partitions to clean.
+ // Trigger clean and validate partitions to clean and earliest savepoint
CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context,
mockHoodieTable, config);
- HoodieInstant earliestCommitToRetain = new
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+ HoodieInstant earliestCommitToRetain = new HoodieInstant(COMPLETED,
"COMMIT", earliestInstant);
List<String> partitionsToClean =
cleanPlanner.getPartitionPathsToClean(Option.of(earliestCommitToRetain));
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ when(metaClient.getActiveTimeline()).thenReturn(activeTimeline);
+ assertEquals(expectedEarliestSavepointInLastClean,
ClusteringUtils.getEarliestReplacedSavepointInClean(activeTimeline,
config.getCleanerPolicy(), cleanerPlan));
+
Collections.sort(expectedPartitions);
Collections.sort(partitionsToClean);
assertEquals(expectedPartitions, partitionsToClean);
@@ -370,74 +386,113 @@ public class TestCleanPlanner {
List<Arguments> arguments = new ArrayList<>();
+ // Code snippet below generates different test cases which will assert the
clean planner output
+ // for these cases. There is no sequential dependency between these cases
and they can be considered
+ // independent.
+ // In the test cases below earliestInstant signifies the new value of
earliest instant to retain as computed
+ // by CleanPlanner. The test case provides current state of the table and
compare the expected values for
+ // earliestSavepointInLastClean and expectedPartitions against computed
values for the same by clean planner
+
// no savepoints tracked in last clean and no additional savepoints. all
partitions in uncleaned instants should be expected
+ // earliest savepoint in last clean is empty since there is no savepoint
in the timeline
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
- earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(),
- activeInstantsPartitionsMap3, Collections.emptyMap(),
threePartitionsInActiveTimeline, false));
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(), Option.empty(),
+ activeInstantsPartitionsMap3, Collections.emptyList(),
threePartitionsInActiveTimeline, false, Collections.emptyMap()));
+
+ // a new savepoint is added after last clean. but rest of uncleaned
touches all partitions, and so all partitions are expected
+ // earliest savepoint in last clean is empty even though savepoint2 was
added to the timeline
+ // since there is no replace commit after the savepoint and before
earliestInstantInLastClean (earliestInstantToRetain)
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), Option.empty(),
activeInstantsPartitionsMap3,
+ Collections.emptyList(), threePartitionsInActiveTimeline, false,
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1))));
// a new savepoint is added after last clean. but rest of uncleaned
touches all partitions, and so all partitions are expected
+ // earliest savepoint in last clean is empty even though savepoint2 was
added to the timeline. This is because there
+ // there are no replace commits between the savepoint and earliestInstant
which is the earliestInstantToRetain
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
- earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(),
- activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline,
false));
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(), Option.empty(),
+ activeInstantsPartitionsMap3, Collections.emptyList(),
threePartitionsInActiveTimeline, false,
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1))));
// previous clean tracks a savepoint which exists in timeline still. only
2 partitions are touched by uncleaned instants. only 2 partitions are expected
+ // earliest savepoint in last clean is empty even though savepoint2 was
added to the timeline
+ // since there is no replace commit after the savepoint and before
earliestInstantInLastClean (earliestInstantToRetain)
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
- activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), Option.empty(),
+ activeInstantsPartitionsMap2, Collections.emptyList(),
twoPartitionsInActiveTimeline, false,
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1))));
// savepoint tracked in previous clean was removed(touching partition1).
latest uncleaned touched 2 other partitions. But when savepoint is removed,
entire list of partitions are expected.
+ // earliest savepoint in last clean is empty since there is no savepoint
in the timeline
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
- activeInstantsPartitionsMap2, Collections.emptyMap(),
threePartitionsInActiveTimeline, false));
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), Option.empty(),
+ activeInstantsPartitionsMap2, Collections.emptyList(),
threePartitionsInActiveTimeline, false, Collections.emptyMap()));
// previous savepoint still exists and touches partition1. uncleaned
touches only partition2 and partition3. expected partition2 and partition3.
+ // earliest savepoint in last clean is empty even though savepoint2 was
added to the timeline
+ // since there is no replace commit after the savepoint and before
earliestInstantInLastClean (earliestInstantToRetain)
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
- activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), Option.empty(),
+ activeInstantsPartitionsMap2,
Collections.singletonList(earliestInstantMinusThreeDays),
twoPartitionsInActiveTimeline, false,
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1))));
// a new savepoint was added compared to previous clean. all 2 partitions
are expected since uncleaned commits touched just 2 partitions.
+ // earliest savepoint in last clean is same as savepoint3 since savepoint3
is the oldest savepoint timestamp in the timeline
+ // and there is a replace commit earliestInstantMinusOneWeek after the
savepoint but before earliestInstantInLastClean (earliestInstantToRetain)
Map<String, List<String>> latestSavepoints = new HashMap<>();
- latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1));
+ latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
- activeInstantsPartitionsMap2, latestSavepoints,
twoPartitionsInActiveTimeline, false));
+ Collections.singletonMap(savepoint3,
Collections.singletonList(PARTITION1)), Option.of(savepoint3),
+ activeInstantsPartitionsMap2,
Collections.singletonList(earliestInstantMinusOneWeek),
twoPartitionsInActiveTimeline, false, latestSavepoints));
// 2 savepoints were tracked in previous clean. one of them is removed in
latest. When savepoint is removed, entire list of partitions are expected.
+ // earliest savepoint in last clean is same as savepoint3 since savepoint3
is part of the timeline
+ // and there is a replace commit earliestInstantMinusOneWeek after the
savepoint but before earliestInstantInLastClean (earliestInstantToRetain)
Map<String, List<String>> previousSavepoints = new HashMap<>();
previousSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
previousSavepoints.put(savepoint3, Collections.singletonList(PARTITION2));
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- previousSavepoints, activeInstantsPartitionsMap2,
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)),
threePartitionsInActiveTimeline, false));
+ previousSavepoints, Option.of(savepoint3),
activeInstantsPartitionsMap2,
Collections.singletonList(earliestInstantMinusOneWeek),
threePartitionsInActiveTimeline, false,
+ Collections.singletonMap(savepoint3,
Collections.singletonList(PARTITION2))
+ ));
// 2 savepoints were tracked in previous clean. one of them is removed in
latest. But a partition part of removed savepoint is already touched by
uncleaned commits.
// Anyways, when savepoint is removed, entire list of partitions are
expected.
+ // earliest savepoint in last clean is empty even though savepoint3 is
part of the timeline
+ // since there is no replace commit between the savepoint3 and
earliestInstantInLastClean (earliestInstantToRetain)
+ // The replace commit earliestInstantMinusThreeDays is after the
earliestInstantInLastClean
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- previousSavepoints, activeInstantsPartitionsMap3,
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)),
threePartitionsInActiveTimeline, false));
+ previousSavepoints, Option.empty(), activeInstantsPartitionsMap3,
Collections.singletonList(earliestInstantMinusThreeDays),
threePartitionsInActiveTimeline, false,
+ Collections.singletonMap(savepoint3,
Collections.singletonList(PARTITION2))
+ ));
// unpartitioned test case. savepoint removed.
List<String> unPartitionsInActiveTimeline =
Arrays.asList(StringUtils.EMPTY_STRING);
Map<String, List<String>> activeInstantsUnPartitionsMap = new HashMap<>();
activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays,
unPartitionsInActiveTimeline);
+ // earliest savepoint in last clean is empty since there is no savepoint
in the timeline
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(false,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING),
- Collections.singletonMap(savepoint2,
Collections.singletonList(StringUtils.EMPTY_STRING)),
- activeInstantsUnPartitionsMap, Collections.emptyMap(),
unPartitionsInActiveTimeline, false));
+ Collections.singletonMap(savepoint2,
Collections.singletonList(StringUtils.EMPTY_STRING)), Option.empty(),
+ activeInstantsUnPartitionsMap, Collections.emptyList(),
unPartitionsInActiveTimeline, false, Collections.emptyMap()));
// savepoint tracked in previous clean was removed(touching partition1).
active instants does not have the instant corresponding to the savepoint.
// latest uncleaned touched 2 other partitions. But when savepoint is
removed, entire list of partitions are expected.
+ // earliest savepoint in last clean is empty since there is no savepoint
in the timeline
activeInstantsPartitionsMap2.remove(earliestInstantMinusOneWeek);
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(true,
earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
- Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
- activeInstantsPartitionsMap2, Collections.emptyMap(),
threePartitionsInActiveTimeline, true));
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), Option.empty(),
+ activeInstantsPartitionsMap2, Collections.emptyList(),
threePartitionsInActiveTimeline, true, Collections.emptyMap()));
return arguments.stream();
}
@@ -467,21 +522,29 @@ public class TestCleanPlanner {
Arguments.of(getCleanByCommitsConfig(), earliestInstant,
allFileGroups, savepoints, replacedFileGroups, expected));
}
- // helper to build common cases for the two policies
+ /**
+ * The function generates test arguments for two cases. One where cleaning
policy
+ * is set to KEEP_LATEST_BY_HOURS and the other where cleaning policy is set
to
+ * KEEP_LATEST_COMMITS. Rest of the arguments are same.
+ */
private static List<Arguments>
buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(boolean
isPartitioned, String earliestInstant,
String latestCompletedInLastClean,
String lastKnownCleanInstantTime,
String earliestInstantInLastClean,
List<String> partitionsInLastClean,
Map<String, List<String>> savepointsTrackedInLastClean,
+
Option<String> expectedEarliestSavepointInLastClean,
Map<String, List<String>> activeInstantsToPartitionsMap,
-
Map<String, List<String>> savepoints,
+
List<String> replaceCommits,
List<String> expectedPartitions,
-
boolean areCommitsForSavepointsRemoved) {
+
boolean areCommitsForSavepointsRemoved,
+
Map<String, List<String>> savepoints) {
return Arrays.asList(Arguments.of(isPartitioned, getCleanByHoursConfig(),
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
- earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints,
expectedPartitions, areCommitsForSavepointsRemoved),
+ earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, expectedEarliestSavepointInLastClean,
+ activeInstantsToPartitionsMap, replaceCommits, expectedPartitions,
areCommitsForSavepointsRemoved, savepoints),
Arguments.of(isPartitioned, getCleanByCommitsConfig(),
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
- earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints,
expectedPartitions, areCommitsForSavepointsRemoved));
+ earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, expectedEarliestSavepointInLastClean,
+ activeInstantsToPartitionsMap, replaceCommits, expectedPartitions,
areCommitsForSavepointsRemoved, savepoints));
}
private static HoodieFileGroup buildFileGroup(List<String>
baseFileCommitTimes) {
@@ -492,7 +555,7 @@ public class TestCleanPlanner {
String fileGroup = UUID.randomUUID() + "-0";
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition,
UUID.randomUUID().toString());
HoodieTimeline timeline = mock(HoodieTimeline.class);
- when(timeline.lastInstant()).thenReturn(Option.of(new
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT",
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
+ when(timeline.lastInstant()).thenReturn(Option.of(new
HoodieInstant(COMPLETED, "COMMIT",
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
HoodieFileGroup group = new HoodieFileGroup(fileGroupId, timeline);
for (String baseFileCommitTime : baseFileCommitTimes) {
when(timeline.containsOrBeforeTimelineStarts(baseFileCommitTime)).thenReturn(true);
@@ -516,15 +579,13 @@ public class TestCleanPlanner {
}
private static Pair<HoodieCleanMetadata, Option<byte[]>>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
-
String lastCompletedTime, Set<String> savepointsToTrack) {
+
String lastCompletedTime, Set<String> savepointsToTrack, Option<String>
earliestCommitToNotArchive) {
try {
Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
Map<String, String> extraMetadata = new HashMap<>();
- if (!savepointsToTrack.isEmpty()) {
- extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
- }
+ extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP,
extraMetadata.isEmpty() ? null : extraMetadata);
return Pair.of(cleanMetadata,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
@@ -545,18 +606,23 @@ public class TestCleanPlanner {
}
}
- private static void mockLastCleanCommit(HoodieTable hoodieTable, String
timestamp, String earliestCommitToRetain, HoodieActiveTimeline activeTimeline,
- Pair<HoodieCleanMetadata,
Option<byte[]>> cleanMetadata)
+ private static HoodieCleanerPlan mockLastCleanCommit(HoodieTable
hoodieTable, String timestamp, String earliestCommitToRetain,
HoodieActiveTimeline activeTimeline,
+
Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadata, Set<String>
savepointsTrackedInLastClean)
throws IOException {
HoodieDefaultTimeline cleanTimeline = mock(HoodieDefaultTimeline.class);
when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
when(hoodieTable.getCleanTimeline()).thenReturn(cleanTimeline);
HoodieDefaultTimeline completedCleanTimeline =
mock(HoodieDefaultTimeline.class);
when(cleanTimeline.filterCompletedInstants()).thenReturn(completedCleanTimeline);
- HoodieInstant latestCleanInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION,
timestamp);
+ HoodieInstant latestCleanInstant = new HoodieInstant(COMPLETED,
HoodieTimeline.CLEAN_ACTION, timestamp);
when(completedCleanTimeline.lastInstant()).thenReturn(Option.of(latestCleanInstant));
when(activeTimeline.isEmpty(latestCleanInstant)).thenReturn(false);
when(activeTimeline.getInstantDetails(latestCleanInstant)).thenReturn(cleanMetadata.getRight());
+ HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant(earliestCommitToRetain, HoodieTimeline.COMMIT_ACTION,
COMPLETED.name()),
+ cleanMetadata.getLeft().getLastCompletedCommitTimestamp(),
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
Collections.emptyMap(),
+ CleanPlanner.LATEST_CLEAN_PLAN_VERSION, null, null,
cleanMetadata.getLeft().getExtraMetadata());
+
when(activeTimeline.readCleanerInfoAsBytes(any())).thenReturn(Option.of(TimelineMetadataUtils.serializeAvroMetadata(cleanerPlan,
HoodieCleanerPlan.class).get()));
HoodieDefaultTimeline commitsTimeline = mock(HoodieDefaultTimeline.class);
when(activeTimeline.getCommitsTimeline()).thenReturn(commitsTimeline);
@@ -564,10 +630,11 @@ public class TestCleanPlanner {
when(hoodieTable.isPartitioned()).thenReturn(true);
when(hoodieTable.isMetadataTable()).thenReturn(false);
+ return cleanerPlan;
}
private static void mockFewActiveInstants(HoodieTable hoodieTable,
Map<String, List<String>> activeInstantsToPartitions,
- Map<String, List<String>>
savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved)
+ Map<String, List<String>>
savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved, List<String>
replaceCommits)
throws IOException {
HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
List<HoodieInstant> instants = new ArrayList<>();
@@ -577,7 +644,7 @@ public class TestCleanPlanner {
instantstoProcess.putAll(savepointedCommitsToAdd);
}
instantstoProcess.forEach((k, v) -> {
- HoodieInstant hoodieInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k);
+ HoodieInstant hoodieInstant = new HoodieInstant(COMPLETED,
HoodieTimeline.COMMIT_ACTION, k);
instants.add(hoodieInstant);
Map<String, List<HoodieWriteStat>> partitionToWriteStats = new
HashMap<>();
v.forEach(partition -> partitionToWriteStats.put(partition,
Collections.emptyList()));
@@ -593,7 +660,23 @@ public class TestCleanPlanner {
});
commitsTimeline.setInstants(instants);
+ Collections.sort(instants);
+
when(hoodieTable.getActiveTimeline().getInstantsAsStream()).thenReturn(instants.stream());
when(hoodieTable.getCompletedCommitsTimeline()).thenReturn(commitsTimeline);
+
+ HoodieDefaultTimeline savepointTimeline = new HoodieDefaultTimeline();
+ List<HoodieInstant> savepointInstants =
savepointedCommitsToAdd.keySet().stream().map(sp -> new
HoodieInstant(COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, sp))
+ .collect(Collectors.toList());
+ savepointTimeline.setInstants(savepointInstants);
+
+ HoodieDefaultTimeline completedReplaceTimeline = new
HoodieDefaultTimeline();
+ List<HoodieInstant> completedReplaceInstants =
replaceCommits.stream().map(rc -> new HoodieInstant(COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, rc))
+ .collect(Collectors.toList());
+ completedReplaceTimeline.setInstants(completedReplaceInstants);
+
+
when(hoodieTable.getActiveTimeline().findInstantsAfterOrEquals(any())).thenCallRealMethod();
+
when(hoodieTable.getActiveTimeline().getCompletedReplaceTimeline()).thenReturn(completedReplaceTimeline);
+
when(hoodieTable.getActiveTimeline().getSavePointTimeline()).thenReturn(savepointTimeline);
when(hoodieTable.isPartitioned()).thenReturn(true);
when(hoodieTable.isMetadataTable()).thenReturn(false);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index cb4a871cab1..435b5543f1c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -48,6 +48,7 @@ import
org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
@@ -287,7 +288,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
verifyArchival(
getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
getActiveCommitInstants(Arrays.asList("00000003", "00000004",
"00000005", "00000006")),
- commitsAfterArchival);
+ commitsAfterArchival, false);
} else if (i < 8) {
assertEquals(originalCommits, commitsAfterArchival);
} else if (i == 8) {
@@ -295,7 +296,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
verifyArchival(
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002",
"00000003", "00000004")),
getActiveCommitInstants(Arrays.asList("00000005", "00000006",
"00000007", "00000008")),
- commitsAfterArchival);
+ commitsAfterArchival, false);
} else {
assertEquals(originalCommits, commitsAfterArchival);
}
@@ -481,13 +482,13 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
// retains only 2 commits. C3 and C8. and savepointed commit for C3.
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002", "00000004", "00000005")),
Stream.concat(getActiveCommitInstants(Arrays.asList("00000003",
"00000006")).stream(),
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
- .collect(Collectors.toList()), commitsAfterArchival);
+ .collect(Collectors.toList()), commitsAfterArchival, true);
} else {
// archives only C1 and C2. stops at first savepointed commit C3.
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
Stream.concat(getActiveCommitInstants(Arrays.asList("00000003",
"00000004", "00000005", "00000006")).stream(),
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
- .collect(Collectors.toList()), commitsAfterArchival);
+ .collect(Collectors.toList()), commitsAfterArchival, false);
}
for (int i = 7; i < 10; i++) {
@@ -502,7 +503,112 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
metaClient.reloadActiveTimeline();
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002", "00000003", "00000004", "00000005", "00000006", "00000007")),
- getActiveCommitInstants(Arrays.asList("00000008", "00000009")),
commitsAfterArchival);
+ getActiveCommitInstants(Arrays.asList("00000008", "00000009")),
commitsAfterArchival, false);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClusteringCommitAndSavepointWithArchival(boolean
archiveBeyondSavepoint) throws Exception {
+ boolean enableMetadata = false;
+ HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2,
HoodieTableType.COPY_ON_WRITE,
+ 10, HoodieFailedWritesCleaningPolicy.EAGER,
WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
+
+ // min archival commits is 2 and max archival commits is 4. and so, after
5th commit, 3 commits will be archived.
+ for (int i = 1; i < 8; i++) {
+ if (i < 3 || i == 5) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") :
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+ } else {
+ testTable.doCluster(String.format("%08d", i),
Collections.singletonMap("p1", Collections.singletonList("f1")),
Collections.singletonList("p2"), 2);
+ }
+ }
+
+ // savepoint 3rd commit
+ String commitToSavepoint = String.format("%08d", 3);
+ HoodieSavepointMetadata savepointMetadata =
testTable.doSavepoint(commitToSavepoint);
+ testTable.addSavepoint(commitToSavepoint, savepointMetadata);
+
+ // trigger archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+ // Only commits 1, 2 should be archived since savepoint exists at c3. In
the case when
+ // archiveBeyondSavepoint is false, archival is blocked since replace
commits have not yet been cleaned
+ List<HoodieInstant> expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000005"));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003",
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
+ expectedActiveInstants, commitsAfterArchival, false);
+
+ // add a clean commit with earliest instant to retain as c8 and earliest
savepoint as c3.
+ // After this commit clean along with snapshot blocks archival even though
earliest instant to retain is c8
+ Map<String, Integer> cleanStats = new HashMap<>();
+ cleanStats.put("p1", 1);
+ cleanStats.put("p2", 2);
+ testTable.doClean(String.format("%08d", 8), cleanStats,
+ Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS,
"00000003"));
+
+ // trigger archival
+ commitsList = archiveAndGetCommitsList(writeConfig);
+ commitsAfterArchival = commitsList.getValue();
+ // archives only C1 and C2. stops at first replace commit C3 after
savepoint based on cleaner
+ expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000005"));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003",
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"),
HoodieTimeline.CLEAN_ACTION));
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
+ expectedActiveInstants, commitsAfterArchival, false);
+
+ // add a clean commit with earliest savepoint set as c7
+ testTable.doClean(String.format("%08d", 9), cleanStats,
Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS, "00000007"));
+
+ // trigger archival
+ commitsList = archiveAndGetCommitsList(writeConfig);
+ commitsAfterArchival = commitsList.getValue();
+
+ if (archiveBeyondSavepoint) {
+ // retains the 2 commits - C3 and C7. Since minInstantsToKeep is 2, c3
is retained. Archival is now blocked at
+ // c7 since that is the replace commit after earliest savepoint c7 in
cleaner
+ expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000003", "00000007"),
HoodieTimeline.REPLACE_COMMIT_ACTION);
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000009"), HoodieTimeline.CLEAN_ACTION));
+
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+ List<HoodieInstant> archivedCommitInstants =
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
"00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+ verifyArchival(archivedCommitInstants, expectedActiveInstants,
commitsAfterArchival, true);
+ } else {
+ // archives only C1 and C2. stops at c3 since clean earliest savepoint
is c3.
+ expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000005"));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003",
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000009"), HoodieTimeline.CLEAN_ACTION));
+ verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000002")),
+ expectedActiveInstants, commitsAfterArchival, false);
+ }
+
+ // savepoint is removed
+ testTable.deleteSavepoint(commitToSavepoint);
+
+ // trigger archival
+ commitsList = archiveAndGetCommitsList(writeConfig);
+ commitsAfterArchival = commitsList.getValue();
+
+ if (archiveBeyondSavepoint) {
+ // change from last state - Removal of savepoint instant from the active
timeline since it is deleted
+ expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000003", "00000007"),
HoodieTimeline.REPLACE_COMMIT_ACTION);
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000009"), HoodieTimeline.CLEAN_ACTION));
+ List<HoodieInstant> archivedCommitInstants =
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
"00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+ verifyArchival(archivedCommitInstants, expectedActiveInstants,
commitsAfterArchival, true);
+ } else {
+ // change from last state - Removal of savepoint instant from the active
timeline since it is deleted
+ // since savepoint is now deleted, it does not block archival.
+ // archival is triggered since clean also does not block it
+ // c6 and c7 are retained since min instants to keep is 2
+ expectedActiveInstants =
getActiveCommitInstants(Arrays.asList("00000006", "00000007"),
HoodieTimeline.REPLACE_COMMIT_ACTION);
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008",
"00000009"), HoodieTimeline.CLEAN_ACTION));
+ List<HoodieInstant> archivedCommitInstants =
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
+
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000003",
"00000004"), HoodieTimeline.REPLACE_COMMIT_ACTION));
+ verifyArchival(archivedCommitInstants, expectedActiveInstants,
commitsAfterArchival, false);
+ }
}
@Test
@@ -869,7 +975,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
.map(p -> new HoodieInstant(State.COMPLETED, p.getValue(),
p.getKey())).collect(Collectors.toList());
List<HoodieInstant> expectedActiveInstants =
instants.subList(numArchivedInstants, instants.size()).stream()
.map(p -> new HoodieInstant(State.COMPLETED, p.getValue(),
p.getKey())).collect(Collectors.toList());
- verifyArchival(expectedArchivedInstants, expectedActiveInstants,
commitsAfterArchival);
+ verifyArchival(expectedArchivedInstants, expectedActiveInstants,
commitsAfterArchival, false);
}
}
assertTrue(hasArchivedInstants, "Some instants should be archived");
@@ -939,7 +1045,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
archivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "00000002"));
verifyArchival(archivedInstants,
getActiveCommitInstants(Arrays.asList("00000005", "00000006",
"00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION),
- commitsAfterArchival);
+ commitsAfterArchival, false);
}
@ParameterizedTest
@@ -1032,7 +1138,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 8);
// Min archival commits is 2 and max archival commits is 4.
// When metadata table is not enabled, after 5th write instant, archive
will be triggered.
- // When metadata table is enabled, after 8th instant (6 write instants + 2
clean instants) >= maxDeltaCommitsMetadataTable,
+ // When metadata table is enabled, after 8th instant (5 write instants + 3
clean instants) >= maxDeltaCommitsMetadataTable,
// archival kicks in when compaction in metadata table triggered.
Map<String, Integer> cleanStats = new HashMap<>();
cleanStats.put("p1", 1);
@@ -1040,7 +1146,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
for (int i = 1; i <= 8; i++) {
if (i == 1) {
testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1",
"p2"), 20);
- } else if (i <= 3) {
+ } else if (i <= 3 || i == 5) {
testTable.doClean(String.format("%08d", i), cleanStats);
} else {
testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"),
2);
@@ -1049,39 +1155,83 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
- if (i < 7) {
+ if (i <= 7) {
assertEquals(originalCommits, commitsAfterArchival);
- } else if (i == 7) {
- if (!enableMetadata) {
- // do archive:
- // clean: 2,3: after archival -> null
- // write: 1,4,5,6,7: after archival -> 6, 7
- List<HoodieInstant> expectedActiveInstants = new
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000006", "00000007")));
-
- List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
-
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000004", "00000005")));
-
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003"), HoodieTimeline.CLEAN_ACTION));
-
- verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
- } else {
- // with metadata enabled, archival in data table is fenced based on
compaction in metadata table.
- assertEquals(originalCommits, commitsAfterArchival);
- }
} else {
- if (!enableMetadata) {
- assertEquals(originalCommits, commitsAfterArchival);
- } else {
- // when i == 8 compaction in metadata table will be triggered, and
then allow archive:
- // clean: 2,3: after archival -> null
- // write: 1,4,5,6,7,8: after archival -> 7, 8
- List<HoodieInstant> expectedActiveInstants = new
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
-
- List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
-
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000004", "00000005", "00000006")));
-
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003"), HoodieTimeline.CLEAN_ACTION));
+ // when i == 8 compaction in metadata table will be triggered, and
then allow archive:
+ // clean: 2,3,5: all will be archived
+ // write: 1,4,6,7,8: after archival -> 7,8
+ List<HoodieInstant> expectedActiveInstants = new
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
+ List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
"00000004", "00000006")));
+
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
"00000003", "00000005"), HoodieTimeline.CLEAN_ACTION));
+
+ verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival, false);
+ }
+ }
+ }
- verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
+ @Test
+ public void testArchiveTableWithCleanerEarliestSavepoint() throws Exception {
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 4,
5, 2);
+
+ // min archival commits is 4 and max archival commits is 5
+ // (commits have to be greater than 5)
+ // and so, after 6th instant, 2 instants will be archived.
+ // Instant 1 -> 10 are commits except 6 which is a clean instant.
+ // Clean instants have SAVEPOINTED_TIMESTAMPS set as 3 till clean instant
14.
+ // At 16th clean instant, SAVEPOINTED_TIMESTAMPS is set as 10.
+ Map<String, Integer> cleanStats = new HashMap<>();
+ cleanStats.put("p1", 1);
+ cleanStats.put("p2", 2);
+ for (int i = 1; i <= 16; i++) {
+ if (i == 7) {
+ testTable.doCluster(String.format("%08d", i), Collections.emptyMap(),
Arrays.asList("p1", "p2"), 20);
+ } else if (i != 6 && i <= 12) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") :
Collections.emptyList(), Arrays.asList("p1", "p2"), 20);
+ } else {
+ String savepoint = "00000003";
+ if (i == 16) {
+ // only 16th clean commit has set savepoint as 10. Before 16th clean
commit only commits
+ // 1 and 2 should be archived.
+ savepoint = "00000010";
}
+ testTable.doClean(String.format("%08d", i), cleanStats,
Collections.singletonMap(CleanerUtils.SAVEPOINTED_TIMESTAMPS, savepoint));
+ }
+ // trigger archival
+ Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
archiveAndGetCommitsList(writeConfig);
+ List<HoodieInstant> originalCommits = commitsList.getKey();
+ List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+ if (i <= 6) {
+ assertEquals(originalCommits, commitsAfterArchival);
+ } else if (i <= 10) {
+ // clean instants have not been archived yet, SAVEPOINTED_TIMESTAMPS
is set as 3 but since there are no replace commits
+ // between earliest savepoint 3 and earliest instant to retain which
is 6, archival is not blocked at 3 but at commit 6
+ // Archival gets triggered at commit 7 and 9, these would remove
commits (1,2) and (3,4) respectively
+ // Other commits do not trigger archival since maxInstantsToKeep is 5
and after archival instants are 4
+ assertEquals(5 + (i % 2 == 0 ? 1 : 0), commitsAfterArchival.size(),
commitsAfterArchival.toString());
+ } else if (i < 16) {
+ // Archival gets triggered at 11 and it archives commits before 6
since archival is blocked at commit 6
+ // due to cleaner earliest instant to retain set at 6
+ // clean instants have not been archived yet, SAVEPOINTED_TIMESTAMPS
is set as 3 but since there are no replace commits
+ // between earliest savepoint 3 and earliest instant to retain which
is 6, archival is not blocked at 3 but at commit 6
+ assertEquals(i - 5, commitsAfterArchival.size(),
commitsAfterArchival.toString());
+ } else {
+ // At 16th clean instant, SAVEPOINTED_TIMESTAMPS is set as 10
+ // There are no replace commits between earliest savepoint (10) and
cleaner earliest instant to retain (16)
+ // Therefore archival will not be blocked by savepoint
+ // active commits were 3,4,5,7,8,9,10,11,12 => After archival only 4
instants would remain 9,10,11,12 based on archival config
+ // clean instants were 6,13,14,15,16 => clean instant 6 would be
archived since commits instants are archived till commit 8
+ List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
+
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009",
"00000010", "00000011", "00000012")));
+ expectedActiveInstants.addAll(
+ getActiveCommitInstants(Arrays.asList("00000013", "00000014",
"00000015", "00000016"), HoodieTimeline.CLEAN_ACTION));
+ List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
+ expectedArchivedInstants.addAll(getAllArchivedCommitInstants(
+ Arrays.asList("00000001", "00000002", "00000003", "00000004",
"00000005", "00000008"), HoodieTimeline.COMMIT_ACTION));
+
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"),
HoodieTimeline.CLEAN_ACTION));
+
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000007"),
HoodieTimeline.REPLACE_COMMIT_ACTION));
+ verifyArchival(expectedArchivedInstants, expectedActiveInstants,
commitsAfterArchival, false);
}
}
}
@@ -1137,7 +1287,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
List<HoodieInstant> expectedActiveInstants = instants.subList(archived,
instants.size()).stream()
.map(p -> new HoodieInstant(State.COMPLETED, p.getValue(),
p.getKey())).collect(Collectors.toList());
- verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival);
+ verifyArchival(expectedArchiveInstants, expectedActiveInstants,
commitsAfterArchival, false);
}
@ParameterizedTest
@@ -1168,6 +1318,9 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED,
HoodieTimeline.ROLLBACK_ACTION, String.format("%02d", startInstant)));
}
+ // Clean and rollback instants are archived only till the last clean
instant in the timeline
+ createCleanMetadata(String.format("%02d", startInstant), false, false,
isEmpty);
+
if (enableMetadataTable) {
// Simulate a compaction commit in metadata table timeline
// so the archival in data table can happen
@@ -1231,7 +1384,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
// after archival 4,5,6,7
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 3);
verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 3)),
- getActiveCommitInstants(instants.subList(3, 7)), commitsAfterArchival);
+ getActiveCommitInstants(instants.subList(3, 7)), commitsAfterArchival,
false);
// 3 more commits, 4 to 6 will be archived. but will not move after 6
since compaction has to kick in metadata table.
for (int i = 0; i < 3; i++) {
@@ -1244,7 +1397,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 3);
- verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)),
getActiveCommitInstants(instants.subList(6, 10)), commitsAfterArchival);
+ verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)),
getActiveCommitInstants(instants.subList(6, 10)), commitsAfterArchival, false);
// No archival should kick in since compaction has not kicked in metadata
table
for (int i = 0; i < 2; i++) {
@@ -1256,7 +1409,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits, commitsAfterArchival);
- verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)),
getActiveCommitInstants(instants.subList(6, 12)), commitsAfterArchival);
+ verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 6)),
getActiveCommitInstants(instants.subList(6, 12)), commitsAfterArchival, false);
String instant13 = metaClient.createNewInstantTime();
instants.add(instant13);
@@ -1278,7 +1431,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
// before archival 7,8,9,10,11,12,13,14
// after archival 11,12,13,14
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4);
- verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 10)),
getActiveCommitInstants(instants.subList(10, 14)), commitsAfterArchival);
+ verifyArchival(getAllArchivedCommitInstants(instants.subList(0, 10)),
getActiveCommitInstants(instants.subList(10, 14)), commitsAfterArchival, false);
}
@ParameterizedTest
@@ -1675,7 +1828,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
return Pair.of(originalCommits, commitsAfterArchival);
}
- private void verifyArchival(List<HoodieInstant> expectedArchivedInstants,
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant>
commitsAfterArchival) {
+ private void verifyArchival(List<HoodieInstant> expectedArchivedInstants,
List<HoodieInstant> expectedActiveInstants, List<HoodieInstant>
commitsAfterArchival, boolean isArchivalBeyondSavepoint) {
expectedActiveInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp));
commitsAfterArchival.sort(Comparator.comparing(HoodieInstant::getTimestamp));
assertEquals(expectedActiveInstants, commitsAfterArchival);
@@ -1689,7 +1842,13 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline();
expectedArchivedInstants.forEach(entry -> {
// check safety
- if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
+ // when isArchivalBeyondSavepoint is set to true, there is a case
possible where archival instant
+ // is after the first completed instant in the timeline. Lets say
savepoint was taken at commit c1
+ // and commit c1 is a deltacommit which is not archived but c2 got
archived. If savepoint is deleted
+ // then containsOrBeforeTimelineStarts check would fail for c2
archived commit as the function compares
+ // against first non savepoint commit in the timeline which is now
deleted. With c1 it would succeed but
+ // once c1 is removed, it would start failing.
+ if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) &&
!isArchivalBeyondSavepoint) {
assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()),
"Archived commits should always be safe");
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index bc6dd6094c5..8261aaf6538 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -290,6 +290,12 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
.limit(numCommits), details);
}
+ @Override
+ public HoodieTimeline findInstantsAfterOrEquals(String commitTime) {
+ return new HoodieDefaultTimeline(getInstantsAsStream()
+ .filter(s -> compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, commitTime)), details);
+ }
+
@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(getInstantsAsStream()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 4ec700fe577..80ba8565507 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.StringUtils;
import java.io.Serializable;
import java.util.List;
+import java.util.Objects;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -250,10 +251,15 @@ public interface HoodieTimeline extends Serializable {
HoodieTimeline filterRequestedRollbackTimeline();
/**
- * Create a new Timeline with all the instants after startTs.
+ * Create a new Timeline with numCommits after startTs.
*/
HoodieTimeline findInstantsAfterOrEquals(String commitTime, int numCommits);
+ /**
+ * Create a new Timeline with all the instants after startTs.
+ */
+ HoodieTimeline findInstantsAfterOrEquals(String commitTime);
+
/**
* Create a new Timeline with instants after startTs and before or on endTs.
*/
@@ -455,6 +461,29 @@ public interface HoodieTimeline extends Serializable {
return predicateToApply.test(commit1, commit2);
}
+ /**
+ * Returns smaller of the two given timestamps. Returns the non null
argument if one of the argument is null.
+ */
+ static String minTimestamp(String commit1, String commit2) {
+ if (StringUtils.isNullOrEmpty(commit1)) {
+ return commit2;
+ } else if (StringUtils.isNullOrEmpty(commit2)) {
+ return commit1;
+ }
+ return minInstant(commit1, commit2);
+ }
+
+ /**
+ * Returns smaller of the two given instants compared by their respective
timestamps.
+ * Returns the non null argument if one of the argument is null.
+ */
+ static HoodieInstant minTimestampInstant(HoodieInstant instant1,
HoodieInstant instant2) {
+ String commit1 = instant1 != null ? instant1.getTimestamp() : null;
+ String commit2 = instant2 != null ? instant2.getTimestamp() : null;
+ String minTimestamp = minTimestamp(commit1, commit2);
+ return Objects.equals(minTimestamp, commit1) ? instant1 : instant2;
+ }
+
/**
* Returns the smaller of the given two instants.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 0fa758c21e1..f1488f31318 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -57,7 +57,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION
public class CleanerUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CleanerUtils.class);
-
+ public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
public static final Integer CLEAN_METADATA_VERSION_1 =
CleanMetadataV1MigrationHandler.VERSION;
public static final Integer CLEAN_METADATA_VERSION_2 =
CleanMetadataV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_METADATA_VERSION =
CLEAN_METADATA_VERSION_2;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 4b4dddccd9b..691695dff45 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
@@ -27,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
@@ -333,12 +335,13 @@ public class ClusteringUtils {
* Returns the earliest instant to retain.
* Make sure the clustering instant won't be archived before cleaned, and
the earliest inflight clustering instant has a previous commit.
*
- * @param activeTimeline The active timeline
- * @param metaClient The meta client
+ * @param activeTimeline The active timeline
+ * @param metaClient The meta client
+ * @param cleanerPolicy The hoodie cleaning policy
* @return the earliest instant to retain for clustering
*/
public static Option<HoodieInstant> getEarliestInstantToRetainForClustering(
- HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient)
throws IOException {
+ HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient,
HoodieCleaningPolicy cleanerPolicy) throws IOException {
Option<HoodieInstant> oldestInstantToRetain = Option.empty();
HoodieTimeline replaceOrClusterTimeline =
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION,
HoodieTimeline.CLUSTERING_ACTION));
if (!replaceOrClusterTimeline.empty()) {
@@ -348,13 +351,14 @@ public class ClusteringUtils {
// The first clustering instant of which timestamp is greater than or
equal to the earliest commit to retain of
// the clean metadata.
HoodieInstant cleanInstant = cleanInstantOpt.get();
- HoodieActionInstant earliestInstantToRetain =
CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested()
- ? cleanInstant
- :
HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()))
- .getEarliestInstantToRetain();
+ HoodieCleanerPlan cleanerPlan =
CleanerUtils.getCleanerPlan(metaClient, cleanInstant.isRequested() ?
cleanInstant :
HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp()));
+ Option<String> earliestInstantToRetain =
Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp);
String retainLowerBound;
- if (earliestInstantToRetain != null &&
!StringUtils.isNullOrEmpty(earliestInstantToRetain.getTimestamp())) {
- retainLowerBound = earliestInstantToRetain.getTimestamp();
+ Option<String> earliestReplacedSavepointInClean =
getEarliestReplacedSavepointInClean(activeTimeline, cleanerPolicy, cleanerPlan);
+ if (earliestReplacedSavepointInClean.isPresent()) {
+ retainLowerBound = earliestReplacedSavepointInClean.get();
+ } else if (earliestInstantToRetain.isPresent() &&
!StringUtils.isNullOrEmpty(earliestInstantToRetain.get())) {
+ retainLowerBound = earliestInstantToRetain.get();
} else {
// no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS
clean policy,
// retain first instant after clean instant.
@@ -365,13 +369,7 @@ public class ClusteringUtils {
// TODO: This case has to be handled. HUDI-6352
retainLowerBound = cleanInstant.getTimestamp();
}
-
- oldestInstantToRetain = replaceOrClusterTimeline.filter(instant ->
- HoodieTimeline.compareTimestamps(
- instant.getTimestamp(),
- HoodieTimeline.GREATER_THAN_OR_EQUALS,
- retainLowerBound))
- .firstInstant();
+ oldestInstantToRetain =
replaceOrClusterTimeline.findInstantsAfterOrEquals(retainLowerBound).firstInstant();
} else {
oldestInstantToRetain = replaceOrClusterTimeline.firstInstant();
}
@@ -379,6 +377,43 @@ public class ClusteringUtils {
return oldestInstantToRetain;
}
+ public static Option<String>
getEarliestReplacedSavepointInClean(HoodieActiveTimeline activeTimeline,
HoodieCleaningPolicy cleanerPolicy,
+
HoodieCleanerPlan cleanerPlan) {
+ // EarliestSavepoint in clean is required to block archival when savepoint
is deleted.
+ // This ensures that archival is blocked until clean has cleaned up files
retained due to savepoint.
+ // If this guard is not present, the archiving of commits can lead to
duplicates. Here is a scenario
+ // illustrating the same. This scenario considers a case where
EarliestSavepoint guard is not present
+ // c1.dc - f1 (c1 deltacommit creates file with id f1)
+ // c2.dc - f2 (c2 deltacommit creates file with id f2)
+ // c2.sp - Savepoint at c2
+ // c3.rc (replacing f2 -> f3) (Replace commit replacing file id f2 with f3)
+ // c4.dc
+ //
+ // Lets say Incremental cleaner moved past the c3.rc without cleaning f2
since savepoint is created at c2.
+ // Archival is blocked at c2 since there is a savepoint at c2.
+ // Let's say the savepoint at c2 is now deleted, Archival would archive
c3.rc since it is unblocked now.
+ // Since c3 is archived and f2 has not been cleaned, the table view would
be considering f2 as a valid
+ // file id. This causes duplicates.
+
+ String earliestInstantToRetain =
Option.ofNullable(cleanerPlan.getEarliestInstantToRetain()).map(HoodieActionInstant::getTimestamp).orElse(null);
+ Option<String[]> savepoints =
Option.ofNullable(cleanerPlan.getExtraMetadata()).map(metadata ->
metadata.getOrDefault(CleanerUtils.SAVEPOINTED_TIMESTAMPS,
StringUtils.EMPTY_STRING).split(","));
+ String earliestSavepoint = savepoints.flatMap(arr ->
Option.fromJavaOptional(Arrays.stream(arr).sorted().findFirst())).orElse(null);
+ if (cleanerPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+ if (!StringUtils.isNullOrEmpty(earliestInstantToRetain) &&
!StringUtils.isNullOrEmpty(earliestSavepoint)) {
+ // When earliestToRetainTs is greater than first savepoint timestamp
and there are no
+ // replace commits between the first savepoint and the
earliestToRetainTs, we can set the
+ // earliestSavepointOpt to empty as there was no cleaning blocked due
to savepoint
+ if (HoodieTimeline.compareTimestamps(earliestInstantToRetain,
HoodieTimeline.GREATER_THAN, earliestSavepoint)) {
+ HoodieTimeline replaceTimeline =
activeTimeline.getCompletedReplaceTimeline().findInstantsInClosedRange(earliestSavepoint,
earliestInstantToRetain);
+ if (!replaceTimeline.empty()) {
+ return Option.of(earliestSavepoint);
+ }
+ }
+ }
+ }
+ return Option.empty();
+ }
+
/**
* @param instant Hudi instant to check.
* @param timeline Hudi timeline.
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index da0f8c021ee..def0268e5c0 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -624,6 +624,17 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
}
}
+ @Test
+ public void testMinTimestamp() {
+ String timestamp1 = "20240601040632402";
+ String timestamp2 = "20250601040632402";
+ assertEquals(timestamp1, HoodieTimeline.minTimestamp(null, timestamp1));
+ assertEquals(timestamp1, HoodieTimeline.minTimestamp("", timestamp1));
+ assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1, null));
+ assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1, ""));
+ assertEquals(timestamp1, HoodieTimeline.minTimestamp(timestamp1,
timestamp2));
+ }
+
@Test
public void testParseDateFromInstantTime() throws ParseException {
// default second granularity instant ID
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 14c00111d21..4a0ef616b3e 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -1056,6 +1056,10 @@ public class HoodieTestTable implements AutoCloseable {
}
public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer>
partitionFileCountsToDelete) throws IOException {
+ return doClean(commitTime, partitionFileCountsToDelete,
Collections.emptyMap());
+ }
+
+ public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer>
partitionFileCountsToDelete, Map<String, String> extraMetadata) throws
IOException {
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
for (Map.Entry<String, Integer> entry :
partitionFileCountsToDelete.entrySet()) {
partitionFilesToDelete.put(entry.getKey(),
getEarliestFilesInPartition(entry.getKey(), entry.getValue()));
@@ -1066,8 +1070,11 @@ public class HoodieTestTable implements AutoCloseable {
deleteFilesInPartition(entry.getKey(), entry.getValue());
}
Pair<HoodieCleanerPlan, HoodieCleanMetadata> cleanerMeta =
getHoodieCleanMetadata(commitTime, testTableState);
- addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue());
- return cleanerMeta.getValue();
+ HoodieCleanMetadata cleanMetadata = cleanerMeta.getValue();
+ cleanerMeta.getKey().setExtraMetadata(extraMetadata);
+ cleanMetadata.setExtraMetadata(extraMetadata);
+ addClean(commitTime, cleanerMeta.getKey(), cleanMetadata);
+ return cleanMetadata;
}
/**
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 7ac82c8e7ae..492240138b5 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -165,7 +165,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
HoodieInstant inflightInstant3 =
metaClient.getActiveTimeline().transitionClusterRequestedToInflight(requestedInstant3,
Option.empty());
HoodieInstant completedInstant3 =
metaClient.getActiveTimeline().transitionClusterInflightToComplete(true,
inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();
- Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient, null);
assertTrue(actual.isPresent());
assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in
timeline, retain first replace commit");
@@ -187,7 +187,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().transitionCleanInflightToComplete(true,
inflightInstant4,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();
- actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient, null);
assertEquals(clusterTime3, actual.get().getTimestamp(),
"retain the first replace commit after the earliestInstantToRetain ");
}
@@ -225,7 +225,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().transitionClusterInflightToComplete(true,
inflightInstant3, Option.empty());
metaClient.reloadActiveTimeline();
- Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient);
+ Option<HoodieInstant> actual =
ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(),
metaClient, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
assertEquals(clusterTime2, actual.get().getTimestamp(),
"retain the first replace commit after the last complete clean ");
}