This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 215deedab36 [HUDI-8266] Make sure CleanPlanner is serializable (#12015)
215deedab36 is described below

commit 215deedab36bf24bba11472e8253e17aa4d93776
Author: Tim Brown <[email protected]>
AuthorDate: Fri Sep 27 05:08:20 2024 -0500

    [HUDI-8266] Make sure CleanPlanner is serializable (#12015)
---
 .../hudi/table/action/clean/CleanPlanner.java      | 32 +++++++++++-----------
 .../apache/hudi/table/action/TestCleanPlanner.java | 13 +++++----
 2 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 851110ff461..c7c7bed2064 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -79,21 +79,18 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
   public static final Integer CLEAN_PLAN_VERSION_2 = 
CleanPlanV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
 
-  private final SyncableFileSystemView fileSystemView;
-  private final HoodieTimeline commitTimeline;
+  private transient HoodieTimeline commitTimeline;
   private final Map<HoodieFileGroupId, CompactionOperation> 
fgIdToPendingCompactionOperations;
   private final Map<HoodieFileGroupId, CompactionOperation> 
fgIdToPendingLogCompactionOperations;
   private final HoodieTable<T, I, K, O> hoodieTable;
   private final HoodieWriteConfig config;
   private transient HoodieEngineContext context;
-  private List<String> savepointedTimestamps;
+  private final List<String> savepointedTimestamps;
   private Option<HoodieInstant> earliestCommitToRetain = Option.empty();
 
   public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> 
hoodieTable, HoodieWriteConfig config) {
     this.context = context;
     this.hoodieTable = hoodieTable;
-    this.fileSystemView = hoodieTable.getHoodieView();
-    this.commitTimeline = hoodieTable.getCompletedCommitsTimeline();
     this.config = config;
     SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
hoodieTable.getSliceView();
     this.fgIdToPendingCompactionOperations = fileSystemView
@@ -111,6 +108,13 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
         : Collections.emptyList());
   }
 
+  private HoodieTimeline getCommitTimeline() {
+    if (commitTimeline == null) {
+      commitTimeline = hoodieTable.getCompletedCommitsTimeline();
+    }
+    return commitTimeline;
+  }
+
   /**
    * @return list of savepointed timestamps in active timeline as of this 
clean planning.
    */
@@ -305,7 +309,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     // In other words, the file versions only apply to the active file groups.
     deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, Option.empty()));
     boolean toDeletePartition = false;
-    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
+    List<HoodieFileGroup> fileGroups = 
hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
     for (HoodieFileGroup fileGroup : fileGroups) {
       int keepVersions = config.getCleanerFileVersionsRetained();
       // do not cleanup slice required for pending compaction
@@ -378,12 +382,12 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
 
     // determine if we have enough commits, to start cleaning.
     boolean toDeletePartition = false;
-    if (commitTimeline.countInstants() > commitsRetained) {
+    if (getCommitTimeline().countInstants() > commitsRetained) {
       HoodieInstant earliestInstant = earliestCommitToRetain.get();
       // all replaced file groups before earliestCommitToRetain are eligible 
to clean
       deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, earliestCommitToRetain));
       // add active files
-      List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
+      List<HoodieFileGroup> fileGroups = 
hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
       for (HoodieFileGroup fileGroup : fileGroups) {
         List<FileSlice> fileSliceList = 
fileGroup.getAllFileSlices().collect(Collectors.toList());
 
@@ -483,9 +487,9 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
   private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> 
savepointedFiles, String partitionPath, Option<HoodieInstant> 
earliestCommitToRetain) {
     final Stream<HoodieFileGroup> replacedGroups;
     if (earliestCommitToRetain.isPresent()) {
-      replacedGroups = 
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
 partitionPath);
+      replacedGroups = 
hoodieTable.getHoodieView().getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
 partitionPath);
     } else {
-      replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
+      replacedGroups = 
hoodieTable.getHoodieView().getAllReplacedFileGroups(partitionPath);
     }
     return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
         // do not delete savepointed files  (archival will make sure 
corresponding replacecommit file is not deleted)
@@ -570,11 +574,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
    * Returns the last completed commit timestamp before clean.
    */
   public String getLastCompletedCommitTimestamp() {
-    if (commitTimeline.lastInstant().isPresent()) {
-      return commitTimeline.lastInstant().get().getTimestamp();
-    } else {
-      return "";
-    }
+    return 
getCommitTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse("");
   }
 
   /*
@@ -624,6 +624,6 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
   }
 
   private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, 
String partitionPath) {
-    return 
!fileSystemView.getReplacedFileGroupsAfterOrOn(earliestCommitToRetain, 
partitionPath).findAny().isPresent();
+    return 
!hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain,
 partitionPath).findAny().isPresent();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index acbff660b83..2d8b7e8354c 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -87,15 +87,12 @@ public class TestCleanPlanner {
 
   private final HoodieTable<?, ?, ?, ?> mockHoodieTable = 
mock(HoodieTable.class);
 
-  private SyncableFileSystemView mockFsView;
-  private static String PARTITION1 = "partition1";
-  private static String PARTITION2 = "partition2";
-  private static String PARTITION3 = "partition3";
+  private static final String PARTITION1 = "partition1";
+  private static final String PARTITION2 = "partition2";
+  private static final String PARTITION3 = "partition3";
 
   @BeforeEach
   void setUp() {
-    mockFsView = mock(SyncableFileSystemView.class);
-    when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView);
     SyncableFileSystemView sliceView = mock(SyncableFileSystemView.class);
     when(mockHoodieTable.getSliceView()).thenReturn(sliceView);
     
when(sliceView.getPendingCompactionOperations()).thenReturn(Stream.empty());
@@ -114,6 +111,9 @@ public class TestCleanPlanner {
   void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, 
List<HoodieFileGroup> allFileGroups, List<Pair<String, Option<byte[]>>> 
savepoints,
                           List<HoodieFileGroup> replacedFileGroups, 
Pair<Boolean, List<CleanFileInfo>> expected) throws IOException {
 
+    SyncableFileSystemView mockFsView = mock(SyncableFileSystemView.class);
+    when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView);
+
     // setup savepoint mocks
     Set<String> savepointTimestamps = 
savepoints.stream().map(Pair::getLeft).collect(Collectors.toSet());
     
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
@@ -128,6 +128,7 @@ public class TestCleanPlanner {
     String partitionPath = "partition1";
     // setup replaced file groups mocks
     if (config.getCleanerPolicy() == 
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      when(mockHoodieTable.getHoodieView()).thenReturn(mockFsView); // 
requires extra reference when looking up latest versions
       
when(mockFsView.getAllReplacedFileGroups(partitionPath)).thenReturn(replacedFileGroups.stream());
     } else {
       when(mockFsView.getReplacedFileGroupsBefore(earliestInstant, 
partitionPath)).thenReturn(replacedFileGroups.stream());

Reply via email to