This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 215deedab36 [HUDI-8266] Make sure CleanPlanner is serializable (#12015)
215deedab36 is described below
commit 215deedab36bf24bba11472e8253e17aa4d93776
Author: Tim Brown <[email protected]>
AuthorDate: Fri Sep 27 05:08:20 2024 -0500
[HUDI-8266] Make sure CleanPlanner is serializable (#12015)
---
.../hudi/table/action/clean/CleanPlanner.java | 32 +++++++++++-----------
.../apache/hudi/table/action/TestCleanPlanner.java | 13 +++++----
2 files changed, 23 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 851110ff461..c7c7bed2064 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -79,21 +79,18 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
public static final Integer CLEAN_PLAN_VERSION_2 =
CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
- private final SyncableFileSystemView fileSystemView;
- private final HoodieTimeline commitTimeline;
+ private transient HoodieTimeline commitTimeline;
private final Map<HoodieFileGroupId, CompactionOperation>
fgIdToPendingCompactionOperations;
private final Map<HoodieFileGroupId, CompactionOperation>
fgIdToPendingLogCompactionOperations;
private final HoodieTable<T, I, K, O> hoodieTable;
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;
- private List<String> savepointedTimestamps;
+ private final List<String> savepointedTimestamps;
private Option<HoodieInstant> earliestCommitToRetain = Option.empty();
public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O>
hoodieTable, HoodieWriteConfig config) {
this.context = context;
this.hoodieTable = hoodieTable;
- this.fileSystemView = hoodieTable.getHoodieView();
- this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
this.config = config;
SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
hoodieTable.getSliceView();
this.fgIdToPendingCompactionOperations = fileSystemView
@@ -111,6 +108,13 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
: Collections.emptyList());
}
+ private HoodieTimeline getCommitTimeline() {
+ if (commitTimeline == null) {
+ commitTimeline = hoodieTable.getCompletedCommitsTimeline();
+ }
+ return commitTimeline;
+ }
+
/**
* @return list of savepointed timestamps in active timeline as of this
clean planning.
*/
@@ -305,7 +309,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, Option.empty()));
boolean toDeletePartition = false;
- List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
+ List<HoodieFileGroup> fileGroups =
hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
@@ -378,12 +382,12 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// determine if we have enough commits, to start cleaning.
boolean toDeletePartition = false;
- if (commitTimeline.countInstants() > commitsRetained) {
+ if (getCommitTimeline().countInstants() > commitsRetained) {
HoodieInstant earliestInstant = earliestCommitToRetain.get();
// all replaced file groups before earliestCommitToRetain are eligible
to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, earliestCommitToRetain));
// add active files
- List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
+ List<HoodieFileGroup> fileGroups =
hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList =
fileGroup.getAllFileSlices().collect(Collectors.toList());
@@ -483,9 +487,9 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String>
savepointedFiles, String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
if (earliestCommitToRetain.isPresent()) {
- replacedGroups =
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
partitionPath);
+ replacedGroups =
hoodieTable.getHoodieView().getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
partitionPath);
} else {
- replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
+ replacedGroups =
hoodieTable.getHoodieView().getAllReplacedFileGroups(partitionPath);
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure
corresponding replacecommit file is not deleted)
@@ -570,11 +574,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
* Returns the last completed commit timestamp before clean.
*/
public String getLastCompletedCommitTimestamp() {
- if (commitTimeline.lastInstant().isPresent()) {
- return commitTimeline.lastInstant().get().getTimestamp();
- } else {
- return "";
- }
+ return
getCommitTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse("");
}
/*
@@ -624,6 +624,6 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
}
private boolean noSubsequentReplaceCommit(String earliestCommitToRetain,
String partitionPath) {
- return
!fileSystemView.getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
partitionPath).findAny().isPresent();
+ return
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
partitionPath).findAny().isPresent();
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index acbff660b83..2d8b7e8354c 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -87,15 +87,12 @@ public class TestCleanPlanner {
private final HoodieTable<?, ?, ?, ?> mockHoodieTable =
mock(HoodieTable.class);
- private SyncableFileSystemView mockFsView;
- private static String PARTITION1 = "partition1";
- private static String PARTITION2 = "partition2";
- private static String PARTITION3 = "partition3";
+ private static final String PARTITION1 = "partition1";
+ private static final String PARTITION2 = "partition2";
+ private static final String PARTITION3 = "partition3";
@BeforeEach
void setUp() {
- mockFsView = mock(SyncableFileSystemView.class);
- when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView);
SyncableFileSystemView sliceView = mock(SyncableFileSystemView.class);
when(mockHoodieTable.getSliceView()).thenReturn(sliceView);
when(sliceView.getPendingCompactionOperations()).thenReturn(Stream.empty());
@@ -114,6 +111,9 @@ public class TestCleanPlanner {
void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant,
List<HoodieFileGroup> allFileGroups, List<Pair<String, Option<byte[]>>>
savepoints,
List<HoodieFileGroup> replacedFileGroups,
Pair<Boolean, List<CleanFileInfo>> expected) throws IOException {
+ SyncableFileSystemView mockFsView = mock(SyncableFileSystemView.class);
+ when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView);
+
// setup savepoint mocks
Set<String> savepointTimestamps =
savepoints.stream().map(Pair::getLeft).collect(Collectors.toSet());
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
@@ -128,6 +128,7 @@ public class TestCleanPlanner {
String partitionPath = "partition1";
// setup replaced file groups mocks
if (config.getCleanerPolicy() ==
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+ when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView); //
requires extra reference when looking up latest versions
when(mockFsView.getAllReplacedFileGroups(partitionPath)).thenReturn(replacedFileGroups.stream());
} else {
when(mockFsView.getReplacedFileGroupsBefore(earliestInstant,
partitionPath)).thenReturn(replacedFileGroups.stream());