danny0405 commented on code in PR #10002:
URL: https://github.com/apache/hudi/pull/10002#discussion_r1398637356


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -118,17 +120,25 @@ HoodieCleanerPlan requestClean(HoodieEngineContext 
context) {
 
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list 
of file slices to be cleaned: " + config.getTableName());
 
-      Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, 
earliestInstant)), cleanerParallelism)
-          .stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+      Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
+      List<String> partitionsToDelete = new ArrayList<>();
+      for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
+        List<String> subPartitionsToClean = partitionsToClean.subList(i, 
Math.min(i + cleanerParallelism, partitionsToClean.size()));

Review Comment:
   This only works when `cleanerParallelism` is smaller than the size of 
`partitionsToClean`.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -164,27 +164,32 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] 
statuses) {
     List<HoodieFileGroup> fileGroups = buildFileGroups(statuses, 
visibleCommitsAndCompactionTimeline, true);
     long fgBuildTimeTakenMs = timer.endTimer();
     timer.startTimer();
-    // Group by partition for efficient updates for both InMemory and 
DiskBased structures.
-    
fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).forEach((partition,
 value) -> {
-      if (!isPartitionAvailableInStore(partition)) {
-        if (bootstrapIndex.useIndex()) {
-          try (BootstrapIndex.IndexReader reader = 
bootstrapIndex.createReader()) {
-            LOG.info("Bootstrap Index available for partition " + partition);
-            List<BootstrapFileMapping> sourceFileMappings =
-                reader.getSourceFileMappingForPartition(partition);
-            addBootstrapBaseFileMapping(sourceFileMappings.stream()
-                .map(s -> new BootstrapBaseFileMapping(new 
HoodieFileGroupId(s.getPartitionPath(),
-                    s.getFileId()), s.getBootstrapFileStatus())));
+    writeLock.lock();
+    try {

Review Comment:
   Why we still need these locks?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java:
##########
@@ -222,6 +222,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String 
partitionPath) {
     return execute(partitionPath, preferredView::getAllFileGroups, 
secondaryView::getAllFileGroups);
   }
 
+  @Override
+  public boolean isPartitionAvailableInStoreForTest(String partitionPath) {
+    return execute(partitionPath, 
preferredView::isPartitionAvailableInStoreForTest, 
secondaryView::isPartitionAvailableInStoreForTest);
+  }

Review Comment:
   We do not add APIs just for tesing purpose.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -305,8 +305,13 @@ void 
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
    */
   @Override
   Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partition) {
-    final List<HoodieFileGroup> fileGroups = new 
ArrayList<>(partitionToFileGroupsMap.get(partition));
-    return fileGroups.stream();
+    readLock.lock();
+    try {
+      final List<HoodieFileGroup> fileGroups = new 
ArrayList<>(partitionToFileGroupsMap.get(partition));
+      return fileGroups.stream();
+    } finally {
+      readLock.unlock();
+    }

Review Comment:
   Confused by all these lock changes.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -491,10 +491,13 @@ public Pair<Boolean, List<CleanFileInfo>> 
getDeletePaths(String partitionPath, O
     Pair<Boolean, List<CleanFileInfo>> deletePaths;
     if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
       deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath, 
earliestCommitToRetain);
+      
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
       deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
+      
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
       deletePaths = getFilesToCleanKeepingLatestHours(partitionPath, 
earliestCommitToRetain);
+      
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));

Review Comment:
   Just move the logic to line 504.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to