nsivabalan commented on code in PR #6580:
URL: https://github.com/apache/hudi/pull/6580#discussion_r972428841


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext 
context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list 
of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), 
cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();

Review Comment:
   minor: `list` -> `partitionList`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext 
context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list 
of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), 
cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();
+            it.forEachRemaining(list::add);
+            Map<String, Pair<Boolean, List<CleanFileInfo>>> res = 
planner.getDeletePaths(list);

Review Comment:
   cleanResult



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -735,6 +735,34 @@ public final Stream<HoodieFileGroup> 
getAllFileGroups(String partitionStr) {
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> 
!isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<Pair<String, List<HoodieFileGroup>>> 
getAllFileGroups(List<String> partitionStr) {
+    return getAllFileGroupsIncludingReplaced(partitionStr)
+        .map(pair -> Pair.of(pair.getLeft(), 
pair.getRight().stream().filter(fg -> 
!isFileGroupReplaced(fg)).collect(Collectors.toList())));
+  }
+
+  private Stream<Pair<String, List<HoodieFileGroup>>> 
getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
+    try {

Review Comment:
   shouldn't we be looking to call the exiting method here. 
   ```
   getAllFileGroupsIncludingReplaced(final String partitionStr)
   ```
   and then union the outputs for multiple partition paths. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -735,6 +735,34 @@ public final Stream<HoodieFileGroup> 
getAllFileGroups(String partitionStr) {
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> 
!isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<Pair<String, List<HoodieFileGroup>>> 
getAllFileGroups(List<String> partitionStr) {
+    return getAllFileGroupsIncludingReplaced(partitionStr)

Review Comment:
   same here. lets try to see if we can re-use methods and avoid code dedup.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext 
context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list 
of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), 
cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();
+            it.forEachRemaining(list::add);
+            Map<String, Pair<Boolean, List<CleanFileInfo>>> res = 
planner.getDeletePaths(list);
+            return res.entrySet().iterator();
+          }, false).collectAsList()
           .stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(it -> it.getKey(), it -> it.getValue()));

Review Comment:
   why this change ? we can leave it as Pair::getKey and Pair::getValue .



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -233,43 +235,47 @@ private Pair<Boolean, List<CleanFileInfo>> 
getFilesToCleanKeepingLatestVersions(
 
     // In this scenario, we will assume that once replaced a file group 
automatically becomes eligible for cleaning completely
     // In other words, the file versions only apply to the active file groups.
-    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, Option.empty()));
-    boolean toDeletePartition = false;
-    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-    for (HoodieFileGroup fileGroup : fileGroups) {
-      int keepVersions = config.getCleanerFileVersionsRetained();
-      // do not cleanup slice required for pending compaction
-      Iterator<FileSlice> fileSliceIterator =
-          fileGroup.getAllFileSlices().filter(fs -> 
!isFileSliceNeededForPendingCompaction(fs)).iterator();
-      if (isFileGroupInPendingCompaction(fileGroup)) {
-        // We have already saved the last version of file-groups for pending 
compaction Id
-        keepVersions--;
-      }
+    List<Pair<String, List<HoodieFileGroup>>> fileGroups = 
fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+    for (Pair<String, List<HoodieFileGroup>> pairFileGroup : fileGroups) {
+
+      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
pairFileGroup.getLeft(), Option.empty()));

Review Comment:
   guess this is the actual change right in this class? i.e. moving 
getReplacedFilesEligibleToClean() from outside for loop to within. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -916,6 +944,8 @@ protected abstract Option<Pair<String, 
CompactionOperation>> getPendingCompactio
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups(String 
partitionPath);
 
+  abstract Stream<Pair<String, List<HoodieFileGroup>>> 
fetchAllStoredFileGroups(List<String> partitionPath);

Review Comment:
   we can probably avoid some of these additional methods if above suggestion 
is followed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to