This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f29811b1a4c [HUDI-7104] Fixing cleaner savepoint interplay to fix edge
case with incremental cleaning (#10651)
f29811b1a4c is described below
commit f29811b1a4ca9121a5124d63ded147dba7b90b93
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Feb 15 05:16:41 2024 -0800
[HUDI-7104] Fixing cleaner savepoint interplay to fix edge case with
incremental cleaning (#10651)
* Fixing incremental cleaning with savepoint
* Addressing feedback
---
.../table/action/clean/CleanActionExecutor.java | 3 +-
.../action/clean/CleanPlanActionExecutor.java | 12 +-
.../hudi/table/action/clean/CleanPlanner.java | 116 ++++++++--
.../apache/hudi/table/action/TestCleanPlanner.java | 247 ++++++++++++++++++++-
.../hudi/utils/TestMetadataConversionUtils.java | 4 +-
.../functional/TestExternalPathHandling.java | 5 +-
.../java/org/apache/hudi/table/TestCleaner.java | 7 +-
.../testutils/HoodieSparkClientTestHarness.java | 4 +-
hudi-common/src/main/avro/HoodieCleanMetadata.avsc | 11 +-
hudi-common/src/main/avro/HoodieCleanerPlan.avsc | 11 +-
.../clean/CleanPlanV1MigrationHandler.java | 3 +-
.../clean/CleanPlanV2MigrationHandler.java | 3 +-
.../org/apache/hudi/common/util/CleanerUtils.java | 5 +-
.../table/view/TestIncrementalFSViewSync.java | 2 +-
.../hudi/common/testutils/HoodieTestTable.java | 8 +-
.../hudi/common/util/TestClusteringUtils.java | 6 +-
16 files changed, 395 insertions(+), 52 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 40d91b63394..61c0eeeffb0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -219,7 +219,8 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.getTimestamp(),
Option.of(timer.endTimer()),
- cleanStats
+ cleanStats,
+ cleanerPlan.getExtraMetadata()
);
if (!skipLocking) {
this.txnManager.beginTransaction(Option.of(inflightInstant),
Option.empty());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index a70bfd256c0..723a95bb218 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -49,11 +49,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
+import static
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T,
I, K, O, Option<HoodieCleanerPlan>> {
private static final Logger LOG =
LoggerFactory.getLogger(CleanPlanActionExecutor.class);
-
private final Option<Map<String, String>> extraMetadata;
public CleanPlanActionExecutor(HoodieEngineContext context,
@@ -142,12 +142,20 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(),
x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), Collections.emptyMap(),
- CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps,
partitionsToDelete);
+ CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps,
partitionsToDelete, prepareExtraMetadata(planner.getSavepointedTimestamps()));
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}
+ private Map<String, String> prepareExtraMetadata(List<String>
savepointedTimestamps) {
+ if (savepointedTimestamps.isEmpty()) {
+ return Collections.emptyMap();
+ } else {
+ return Collections.singletonMap(SAVEPOINTED_TIMESTAMPS,
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+ }
+ }
+
/**
* Creates a Cleaner plan if there are files to be cleaned and stores them
in instant file.
* Cleaner Plan contains absolute file paths.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 0dd516a88d1..19cbe0f91a7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -41,6 +41,7 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
@@ -55,6 +56,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -78,6 +80,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable
{
public static final Integer CLEAN_PLAN_VERSION_1 =
CleanPlanV1MigrationHandler.VERSION;
public static final Integer CLEAN_PLAN_VERSION_2 =
CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
+ public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
@@ -86,6 +89,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable
{
private final HoodieTable<T, I, K, O> hoodieTable;
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;
+ private List<String> savepointedTimestamps;
public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O>
hoodieTable, HoodieWriteConfig config) {
this.context = context;
@@ -109,25 +113,43 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
LOG.info("Load all partitions and files into file system view in
advance.");
fileSystemView.loadAllPartitions();
}
+ // collect savepointed timestamps to be assist with incremental cleaning.
For non-partitioned and metadata table, we may not need this.
+ this.savepointedTimestamps = hoodieTable.isMetadataTable() ?
Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ?
hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
+ : Collections.EMPTY_LIST);
+ }
+
+ /**
+ * @return list of savepointed timestamps in active timeline as of this
clean planning.
+ */
+ List<String> getSavepointedTimestamps() {
+ return this.savepointedTimestamps;
}
/**
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
- if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
+ HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
+ return metadata.getPartitionMetadata().values().stream().flatMap(s ->
s.getSavepointDataFile().stream());
+ }
+
+ private Stream<String> getPartitionsFromSavepoint(String savepointTime) {
+ HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
+ return metadata.getPartitionMetadata().keySet().stream();
+ }
+
+ private HoodieSavepointMetadata getSavepointMetadata(String
savepointTimestamp) {
+ if (!hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
throw new HoodieSavepointException(
- "Could not get data files for savepoint " + savepointTime + ". No
such savepoint.");
+ "Could not get data files for savepoint " + savepointTimestamp + ".
No such savepoint.");
}
- HoodieInstant instant = new HoodieInstant(false,
HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
- HoodieSavepointMetadata metadata;
+ HoodieInstant instant = new HoodieInstant(false,
HoodieTimeline.SAVEPOINT_ACTION, savepointTimestamp);
try {
- metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+ return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
} catch (IOException e) {
- throw new HoodieSavepointException("Could not get savepointed data files
for savepoint " + savepointTime, e);
+ throw new HoodieSavepointException("Could not get savepointed data files
for savepoint " + savepointTimestamp, e);
}
- return metadata.getPartitionMetadata().values().stream().flatMap(s ->
s.getSavepointDataFile().stream());
}
/**
@@ -191,25 +213,71 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths
that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
- return
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
+
+ List<String> incrementalPartitions =
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) &&
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
- HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
- try {
- if
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
-
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- return
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
- } else {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
- }
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp()))
+
.flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());
+
+ // If any savepoint is removed b/w previous clean and this clean planning,
lets include the partitions of interest.
+ // for metadata table and non partitioned table, we do not need this
additional processing.
+ if (hoodieTable.isMetadataTable() || !hoodieTable.isPartitioned()) {
+ return incrementalPartitions;
+ }
+
+ List<String> partitionsFromDeletedSavepoints =
getPartitionsFromDeletedSavepoint(cleanMetadata);
+ LOG.info("Including partitions part of savepointed commits which was
removed after last known clean " + partitionsFromDeletedSavepoints.toString());
+ List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
+ partitionsOfInterest.addAll(partitionsFromDeletedSavepoints);
+ return
partitionsOfInterest.stream().distinct().collect(Collectors.toList());
+ }
+
+ private List<String> getPartitionsFromDeletedSavepoint(HoodieCleanMetadata
cleanMetadata) {
+ List<String> savepointedTimestampsFromLastClean =
Arrays.stream(cleanMetadata.getExtraMetadata()
+ .getOrDefault(SAVEPOINTED_TIMESTAMPS,
StringUtils.EMPTY_STRING).split(","))
+ .filter(partition ->
!StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
+ if (savepointedTimestampsFromLastClean.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // check for any savepointed removed in latest compared to previous saved
list
+ List<String> removedSavepointedTimestamps = new
ArrayList<>(savepointedTimestampsFromLastClean);
+ removedSavepointedTimestamps.removeAll(savepointedTimestamps);
+ if (removedSavepointedTimestamps.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // fetch list of partitions from the removed savepoints and add it to
return list
+ return removedSavepointedTimestamps.stream().flatMap(savepointCommit -> {
+ Option<HoodieInstant> instantOption =
hoodieTable.getCompletedCommitsTimeline().filter(instant ->
instant.getTimestamp().equals(savepointCommit)).firstInstant();
+ if (!instantOption.isPresent()) {
+ LOG.warn("Skipping to process a commit for which savepoint was removed
as the instant moved to archived timeline already");
+ }
+ HoodieInstant instant = instantOption.get();
+ return getPartitionsForInstants(instant);
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Fetch partitions updated as part of a HoodieInstant.
+ * @param instant {@link HoodieInstant} of interest.
+ * @return partitions that were part of {@link HoodieInstant} given.
+ */
+ private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
+ try {
+ if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+ hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ return
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
+ } else {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return commitMetadata.getPartitionToWriteStats().keySet().stream();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index e5a528b9382..4268cc36d47 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -19,6 +19,8 @@
package org.apache.hudi.table.action;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -26,16 +28,20 @@ import
org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,6 +67,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
+import static
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -72,6 +80,9 @@ public class TestCleanPlanner {
private final HoodieTable<?, ?, ?, ?> mockHoodieTable =
mock(HoodieTable.class);
private SyncableFileSystemView mockFsView;
+ private static String PARTITION1 = "partition1";
+ private static String PARTITION2 = "partition2";
+ private static String PARTITION3 = "partition3";
@BeforeEach
void setUp() {
@@ -93,7 +104,7 @@ public class TestCleanPlanner {
@ParameterizedTest
@MethodSource("testCases")
void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant,
List<HoodieFileGroup> allFileGroups, List<Pair<String, Option<byte[]>>>
savepoints,
- List<HoodieFileGroup> replacedFileGroups,
Pair<Boolean, List<CleanFileInfo>> expected) {
+ List<HoodieFileGroup> replacedFileGroups,
Pair<Boolean, List<CleanFileInfo>> expected) throws IOException {
// setup savepoint mocks
Set<String> savepointTimestamps =
savepoints.stream().map(Pair::getLeft).collect(Collectors.toSet());
@@ -122,10 +133,48 @@ public class TestCleanPlanner {
assertEquals(expected, actual);
}
+ @ParameterizedTest
+ @MethodSource("incrCleaningPartitionsTestCases")
+ void testPartitionsForIncrCleaning(HoodieWriteConfig config, String
earliestInstant,
+ String lastCompletedTimeInLastClean,
String lastCleanInstant, String earliestInstantsInLastClean, List<String>
partitionsInLastClean,
+ Map<String, List<String>>
savepointsTrackedInLastClean, Map<String, List<String>>
activeInstantsPartitions,
+ Map<String, List<String>> savepoints,
List<String> expectedPartitions) throws IOException {
+ HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
+ when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
+ // setup savepoint mocks
+ Set<String> savepointTimestamps =
savepoints.keySet().stream().collect(Collectors.toSet());
+
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
+ if (!savepoints.isEmpty()) {
+ for (Map.Entry<String, List<String>> entry: savepoints.entrySet()) {
+ Pair<HoodieSavepointMetadata, Option<byte[]>>
savepointMetadataOptionPair = getSavepointMetadata(entry.getValue());
+ HoodieInstant instant = new HoodieInstant(false,
HoodieTimeline.SAVEPOINT_ACTION, entry.getKey());
+
when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight());
+ }
+ }
+
+ // prepare last Clean Metadata
+ Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadataOptionPair =
+ getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant,
earliestInstantsInLastClean, lastCompletedTimeInLastClean,
savepointsTrackedInLastClean.keySet());
+ mockLastCleanCommit(mockHoodieTable, lastCleanInstant,
earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
+ mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions,
savepointsTrackedInLastClean);
+
+ // Trigger clean and validate partitions to clean.
+ CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context,
mockHoodieTable, config);
+ HoodieInstant earliestCommitToRetain = new
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+ List<String> partitionsToClean =
cleanPlanner.getPartitionPathsToClean(Option.of(earliestCommitToRetain));
+ Collections.sort(expectedPartitions);
+ Collections.sort(partitionsToClean);
+ assertEquals(expectedPartitions, partitionsToClean);
+ }
+
static Stream<Arguments> testCases() {
return Stream.concat(keepLatestByHoursOrCommitsArgs(),
keepLatestVersionsArgs());
}
+ static Stream<Arguments> incrCleaningPartitionsTestCases() {
+ return keepLatestByHoursOrCommitsArgsIncrCleanPartitions();
+ }
+
static Stream<Arguments> keepLatestVersionsArgs() {
HoodieWriteConfig keepLatestVersionsConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -278,6 +327,99 @@ public class TestCleanPlanner {
Collections.emptyList(),
Collections.singletonList(replacedFileGroup),
Pair.of(false, Collections.emptyList())));
+ return arguments.stream();
+ }
+
+ static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions()
{
+ String earliestInstant = "20231204194919610";
+ String earliestInstantPlusTwoDays = "20231206194919610";
+ String lastCleanInstant = earliestInstantPlusTwoDays;
+ String earliestInstantMinusThreeDays = "20231201194919610";
+ String earliestInstantMinusFourDays = "20231130194919610";
+ String earliestInstantMinusFiveDays = "20231129194919610";
+ String earliestInstantMinusSixDays = "20231128194919610";
+ String earliestInstantInLastClean = earliestInstantMinusSixDays;
+ String lastCompletedInLastClean = earliestInstantMinusSixDays;
+ String earliestInstantMinusOneWeek = "20231127194919610";
+ String savepoint2 = earliestInstantMinusOneWeek;
+ String earliestInstantMinusOneMonth = "20231104194919610";
+ String savepoint3 = earliestInstantMinusOneMonth;
+
+ List<String> threePartitionsInActiveTimeline = Arrays.asList(PARTITION1,
PARTITION2, PARTITION3);
+ Map<String, List<String>> activeInstantsPartitionsMap3 = new HashMap<>();
+ activeInstantsPartitionsMap3.put(earliestInstantMinusThreeDays,
threePartitionsInActiveTimeline);
+ activeInstantsPartitionsMap3.put(earliestInstantMinusFourDays,
threePartitionsInActiveTimeline);
+ activeInstantsPartitionsMap3.put(earliestInstantMinusFiveDays,
threePartitionsInActiveTimeline);
+
+ List<String> twoPartitionsInActiveTimeline = Arrays.asList(PARTITION2,
PARTITION3);
+ Map<String, List<String>> activeInstantsPartitionsMap2 = new HashMap<>();
+ activeInstantsPartitionsMap2.put(earliestInstantMinusThreeDays,
twoPartitionsInActiveTimeline);
+ activeInstantsPartitionsMap2.put(earliestInstantMinusFourDays,
twoPartitionsInActiveTimeline);
+ activeInstantsPartitionsMap2.put(earliestInstantMinusFiveDays,
twoPartitionsInActiveTimeline);
+
+ List<Arguments> arguments = new ArrayList<>();
+
+ // no savepoints tracked in last clean and no additional savepoints. all
partitions in uncleaned instants should be expected
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(),
+ activeInstantsPartitionsMap3, Collections.emptyMap(),
threePartitionsInActiveTimeline));
+
+ // a new savepoint is added after last clean. but rest of uncleaned
touches all partitions, and so all partitions are expected
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.emptyMap(),
+ activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline));
+
+ // previous clean tracks a savepoint which exists in timeline still. only
2 partitions are touched by uncleaned instants. only 2 partitions are expected
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
+ activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
+
+ // savepoint tracked in previous clean was removed(touching partition1).
latest uncleaned touched 2 other partitions. So, in total 3 partitions are
expected.
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
+ activeInstantsPartitionsMap2, Collections.emptyMap(),
threePartitionsInActiveTimeline));
+
+ // previous savepoint still exists and touches partition1. uncleaned
touches only partition2 and partition3. expected partition2 and partition3.
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
+ activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
+
+ // a new savepoint was added compared to previous clean. all 2 partitions
are expected since uncleaned commits touched just 2 partitions.
+ Map<String, List<String>> latestSavepoints = new HashMap<>();
+ latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
+ latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1));
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(PARTITION1)),
+ activeInstantsPartitionsMap2, latestSavepoints,
twoPartitionsInActiveTimeline));
+
+ // 2 savepoints were tracked in previous clean. one of them is removed in
latest. A partition which was part of the removed savepoint should be added in
final
+ // list of partitions to clean
+ Map<String, List<String>> previousSavepoints = new HashMap<>();
+ latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
+ latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION2));
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ previousSavepoints, activeInstantsPartitionsMap2,
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)),
twoPartitionsInActiveTimeline));
+
+ // 2 savepoints were tracked in previous clean. one of them is removed in
latest. But a partition part of removed savepoint is already touched by
uncleaned commits.
+ // so we expect all 3 partitions to be in final list.
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+ previousSavepoints, activeInstantsPartitionsMap3,
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)),
threePartitionsInActiveTimeline));
+
+ // unpartitioned test case. savepoint removed.
+ List<String> unPartitionsInActiveTimeline =
Arrays.asList(StringUtils.EMPTY_STRING);
+ Map<String, List<String>> activeInstantsUnPartitionsMap = new HashMap<>();
+ activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays,
unPartitionsInActiveTimeline);
+
+
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+ earliestInstant, lastCompletedInLastClean, lastCleanInstant,
earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING),
+ Collections.singletonMap(savepoint2,
Collections.singletonList(StringUtils.EMPTY_STRING)),
+ activeInstantsUnPartitionsMap, Collections.emptyMap(),
unPartitionsInActiveTimeline));
return arguments.stream();
}
@@ -307,9 +449,29 @@ public class TestCleanPlanner {
Arguments.of(getCleanByCommitsConfig(), earliestInstant,
allFileGroups, savepoints, replacedFileGroups, expected));
}
+ // helper to build common cases for the two policies
+ private static List<Arguments>
buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(String
earliestInstant,
+
String latestCompletedInLastClean,
+
String lastKnownCleanInstantTime,
+
String earliestInstantInLastClean,
+
List<String> partitionsInLastClean,
+
Map<String, List<String>> savepointsTrackedInLastClean,
+
Map<String, List<String>> activeInstantsToPartitionsMap,
+
Map<String, List<String>> savepoints,
+
List<String> expectedPartitions) {
+ return Arrays.asList(Arguments.of(getCleanByHoursConfig(),
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
+ earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints,
expectedPartitions),
+ Arguments.of(getCleanByCommitsConfig(), earliestInstant,
latestCompletedInLastClean, lastKnownCleanInstantTime,
+ earliestInstantInLastClean, partitionsInLastClean,
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints,
expectedPartitions));
+ }
+
private static HoodieFileGroup buildFileGroup(List<String>
baseFileCommitTimes) {
+ return buildFileGroup(baseFileCommitTimes, PARTITION1);
+ }
+
+ private static HoodieFileGroup buildFileGroup(List<String>
baseFileCommitTimes, String partition) {
String fileGroup = UUID.randomUUID() + "-0";
- HoodieFileGroupId fileGroupId = new HoodieFileGroupId("partition1",
UUID.randomUUID().toString());
+ HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition,
UUID.randomUUID().toString());
HoodieTimeline timeline = mock(HoodieTimeline.class);
when(timeline.lastInstant()).thenReturn(Option.of(new
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT",
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
HoodieFileGroup group = new HoodieFileGroup(fileGroupId, timeline);
@@ -333,4 +495,85 @@ public class TestCleanPlanner {
throw new UncheckedIOException(ex);
}
}
+
+ private static Pair<HoodieCleanMetadata, Option<byte[]>>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
+
String lastCompletedTime, Set<String> savepointsToTrack) {
+ try {
+ Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
+ partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
+ Map<String, String> extraMetadata = new HashMap<>();
+ if (!savepointsToTrack.isEmpty()) {
+ extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
+ }
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+ CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata);
+ return Pair.of(cleanMetadata,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private static Pair<HoodieSavepointMetadata, Option<byte[]>>
getSavepointMetadata(List<String> partitions) {
+ try {
+ Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
+ partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
+ HoodieSavepointMetadata savepointMetadata =
+ new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
+ return Pair.of(savepointMetadata,
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata));
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private static void mockLastCleanCommit(HoodieTable hoodieTable, String
timestamp, String earliestCommitToRetain, HoodieActiveTimeline activeTimeline,
+ Pair<HoodieCleanMetadata,
Option<byte[]>> cleanMetadata)
+ throws IOException {
+ HoodieDefaultTimeline cleanTimeline = mock(HoodieDefaultTimeline.class);
+ when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
+ when(hoodieTable.getCleanTimeline()).thenReturn(cleanTimeline);
+ HoodieDefaultTimeline completedCleanTimeline =
mock(HoodieDefaultTimeline.class);
+
when(cleanTimeline.filterCompletedInstants()).thenReturn(completedCleanTimeline);
+ HoodieInstant latestCleanInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION,
timestamp);
+
when(completedCleanTimeline.lastInstant()).thenReturn(Option.of(latestCleanInstant));
+ when(activeTimeline.isEmpty(latestCleanInstant)).thenReturn(false);
+
when(activeTimeline.getInstantDetails(latestCleanInstant)).thenReturn(cleanMetadata.getRight());
+
+ HoodieDefaultTimeline commitsTimeline = mock(HoodieDefaultTimeline.class);
+ when(activeTimeline.getCommitsTimeline()).thenReturn(commitsTimeline);
+
when(commitsTimeline.isBeforeTimelineStarts(earliestCommitToRetain)).thenReturn(false);
+
+ when(hoodieTable.isPartitioned()).thenReturn(true);
+ when(hoodieTable.isMetadataTable()).thenReturn(false);
+ }
+
+ private static void mockFewActiveInstants(HoodieTable hoodieTable,
Map<String, List<String>> activeInstantsToPartitions,
+ Map<String, List<String>>
savepointedCommitsToAdd)
+ throws IOException {
+ HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
+ List<HoodieInstant> instants = new ArrayList<>();
+ Map<String, List<String>> instantstoProcess = new HashMap<>();
+ instantstoProcess.putAll(activeInstantsToPartitions);
+ instantstoProcess.putAll(savepointedCommitsToAdd);
+ instantstoProcess.forEach((k,v) -> {
+ HoodieInstant hoodieInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k);
+ instants.add(hoodieInstant);
+ Map<String, List<HoodieWriteStat>> partitionToWriteStats = new
HashMap<>();
+ v.forEach(partition -> partitionToWriteStats.put(partition,
Collections.emptyList()));
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ v.forEach(partition -> {
+ commitMetadata.getPartitionToWriteStats().put(partition,
Collections.emptyList());
+ });
+ try {
+
when(hoodieTable.getActiveTimeline().getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(commitMetadata));
+ } catch (IOException e) {
+ throw new RuntimeException("Should not have failed", e);
+ }
+ });
+
+ commitsTimeline.setInstants(instants);
+
when(hoodieTable.getCompletedCommitsTimeline()).thenReturn(commitsTimeline);
+ when(hoodieTable.isPartitioned()).thenReturn(true);
+ when(hoodieTable.isMetadataTable()).thenReturn(false);
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 8d7380631d4..f050f91d169 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -392,7 +392,7 @@ public class TestMetadataConversionUtils extends
HoodieCommonTestHarness {
private void createCleanMetadata(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""),
- "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>());
+ "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
@@ -401,7 +401,7 @@ public class TestMetadataConversionUtils extends
HoodieCommonTestHarness {
Collections.emptyList(),
instantTime,
"");
- HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats));
+ HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan,
cleanMetadata);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 970ff9b0c64..039be9048a2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -154,7 +154,8 @@ public class TestExternalPathHandling extends
HoodieClientTestBase {
HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
cleanTime,
Option.empty(),
- cleanStats);
+ cleanStats,
+ Collections.EMPTY_MAP);
try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT,
Option.of(cleanTime)).getMetadataWriter(cleanTime).get()) {
hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
metaClient.getActiveTimeline().transitionCleanInflightToComplete(true,
inflightClean,
@@ -292,6 +293,6 @@ public class TestExternalPathHandling extends
HoodieClientTestBase {
return new HoodieCleanerPlan(earliestInstantToRetain,
latestCommit,
writeConfig.getCleanerPolicy().name(), Collections.emptyMap(),
- CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
filePathsToBeDeletedPerPartition, Collections.emptyList());
+ CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
filePathsToBeDeletedPerPartition, Collections.emptyList(),
Collections.EMPTY_MAP);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 1644103f050..001ea19d168 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -775,7 +775,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
instantTime,
Option.of(0L),
- Arrays.asList(cleanStat1, cleanStat2)
+ Arrays.asList(cleanStat1, cleanStat2),
+ Collections.EMPTY_MAP
);
metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
@@ -1132,9 +1133,9 @@ public class TestCleaner extends HoodieCleanerTestBase {
// add clean instant
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""),
- "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>());
+ "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>(), Collections.emptyMap());
HoodieCleanMetadata cleanMeta = new HoodieCleanMetadata("", 0L, 0,
- "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>());
+ "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), Collections.emptyMap());
testTable.addClean("30", cleanerPlan, cleanMeta);
// add file in partition "part_2"
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index e0a0ab69a90..25f1b2fa4c7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -643,7 +643,7 @@ public abstract class HoodieSparkClientTestHarness extends
HoodieWriterClientTes
public HoodieInstant createCleanMetadata(String instantTime, boolean
inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws
IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""), "", "",
- new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>());
+ new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
if (inflightOnly) {
HoodieTestTable.of(metaClient).addInflightClean(instantTime,
cleanerPlan);
} else {
@@ -655,7 +655,7 @@ public abstract class HoodieSparkClientTestHarness extends
HoodieWriterClientTes
Collections.emptyList(),
instantTime,
"");
- HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats));
+ HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan,
cleanMetadata, isEmptyForAll, isEmptyCompleted);
}
return new HoodieInstant(inflightOnly, "clean", instantTime);
diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
index e51ecd0300c..c47690e982b 100644
--- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
@@ -41,6 +41,15 @@
"default" : null
}],
"default" : null
- }
+ },
+ {
+ "name":"extraMetadata",
+ "type":["null", {
+ "type":"map",
+ "values":"string",
+ "default": null
+ }],
+ "default": null
+ }
]
}
diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
index 42842c8be29..de0d9fccc1d 100644
--- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
@@ -105,6 +105,15 @@
{ "type":"array", "items":"string"}
],
"default": null
- }
+ },
+ {
+ "name":"extraMetadata",
+ "type":["null", {
+ "type":"map",
+ "values":"string",
+ "default": null
+ }],
+ "default": null
+ }
]
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
index 844376cbbfd..a4c4cefa2a2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -63,6 +64,6 @@ public class CleanPlanV1MigrationHandler extends
AbstractMigratorBase<HoodieClea
.map(e -> Pair.of(e.getKey(), e.getValue().stream().map(v -> new
Path(v.getFilePath()).getName())
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey,
Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(),
plan.getLastCompletedCommitTimestamp(),
- plan.getPolicy(), filesPerPartition, VERSION, new HashMap<>(), new
ArrayList<>());
+ plan.getPolicy(), filesPerPartition, VERSION, new HashMap<>(), new
ArrayList<>(), Collections.EMPTY_MAP);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
index aacdd26aeda..573b65bfb21 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,7 +58,7 @@ public class CleanPlanV2MigrationHandler extends
AbstractMigratorBase<HoodieClea
new Path(FSUtils.getPartitionPath(metaClient.getBasePath(),
e.getKey()), v).toString(), false))
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey,
Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(),
plan.getLastCompletedCommitTimestamp(),
- plan.getPolicy(), new HashMap<>(), VERSION, filePathsPerPartition, new
ArrayList<>());
+ plan.getPolicy(), new HashMap<>(), VERSION, filePathsPerPartition, new
ArrayList<>(), Collections.emptyMap());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 899bd673665..0fa758c21e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -64,7 +64,8 @@ public class CleanerUtils {
public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
Option<Long>
durationInMs,
- List<HoodieCleanStat>
cleanStats) {
+ List<HoodieCleanStat>
cleanStats,
+ Map<String, String>
extraMetadatafromCleanPlan) {
Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new
HashMap<>();
Map<String, HoodieCleanPartitionMetadata> partitionBootstrapMetadataMap =
new HashMap<>();
@@ -92,7 +93,7 @@ public class CleanerUtils {
}
return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(()
-> -1L), totalDeleted, earliestCommitToRetain,
- lastCompletedCommitTimestamp, partitionMetadataMap,
CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap);
+ lastCompletedCommitTimestamp, partitionMetadataMap,
CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap,
extraMetadatafromCleanPlan);
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 860340d5d6c..852f916c1a4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -631,7 +631,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
HoodieInstant cleanInflightInstant = new HoodieInstant(true,
HoodieTimeline.CLEAN_ACTION, cleanInstant);
metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant);
- HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats);
+ HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats,
Collections.EMPTY_MAP);
metaClient.getActiveTimeline().saveAsComplete(cleanInflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 4ff3b2b7e46..ca13ff79c52 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -348,7 +348,7 @@ public class HoodieTestTable {
public HoodieTestTable addClean(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING),
- EMPTY_STRING, EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
+ EMPTY_STRING, EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(),
Collections.EMPTY_MAP);
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
@@ -357,19 +357,19 @@ public class HoodieTestTable {
Collections.emptyList(),
instantTime,
"");
- HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats));
+ HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime,
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan,
cleanMetadata);
}
public Pair<HoodieCleanerPlan, HoodieCleanMetadata>
getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING),
- EMPTY_STRING, EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
+ EMPTY_STRING, EMPTY_STRING, new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(),
Collections.EMPTY_MAP);
List<HoodieCleanStat> cleanStats = new ArrayList<>();
for (Map.Entry<String, List<String>> entry :
testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) {
cleanStats.add(new
HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
entry.getKey(), entry.getValue(), entry.getValue(),
Collections.emptyList(), commitTime, ""));
}
- return Pair.of(cleanerPlan, convertCleanMetadata(commitTime,
Option.of(0L), cleanStats));
+ return Pair.of(cleanerPlan, convertCleanMetadata(commitTime,
Option.of(0L), cleanStats, Collections.EMPTY_MAP));
}
public HoodieTestTable addRequestedRollback(String instantTime,
HoodieRollbackPlan plan) throws IOException {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 6ee601e3deb..d3375fe5e8a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -181,7 +181,7 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
HoodieInstant inflightInstant4 =
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4,
Option.empty());
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1,
1L, 1,
- completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0,
Collections.emptyMap());
+ completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0,
Collections.emptyMap(), Collections.emptyMap());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(true,
inflightInstant4,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();
@@ -205,11 +205,11 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
HoodieInstant requestedInstant2 = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
cleanTime1);
HoodieCleanerPlan cleanerPlan1 = new HoodieCleanerPlan(null, clusterTime1,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(),
Collections.emptyMap(),
- CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(),
Collections.emptyList());
+ CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(),
Collections.emptyList(), Collections.EMPTY_MAP);
metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
HoodieInstant inflightInstant2 =
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2,
Option.empty());
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1,
1L, 1,
- "", "", Collections.emptyMap(), 0, Collections.emptyMap());
+ "", "", Collections.emptyMap(), 0, Collections.emptyMap(),
Collections.emptyMap());
metaClient.getActiveTimeline().transitionCleanInflightToComplete(true,
inflightInstant2,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
metaClient.reloadActiveTimeline();