danny0405 commented on code in PR #10002:
URL: https://github.com/apache/hudi/pull/10002#discussion_r1398637356
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -118,17 +120,25 @@ HoodieCleanerPlan requestClean(HoodieEngineContext
context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
- Map<String, Pair<Boolean, List<CleanFileInfo>>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean,
earliestInstant)), cleanerParallelism)
- .stream()
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
+ List<String> partitionsToDelete = new ArrayList<>();
+ for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
+ List<String> subPartitionsToClean = partitionsToClean.subList(i,
Math.min(i + cleanerParallelism, partitionsToClean.size()));
Review Comment:
This only works when `cleanerParallelism` is smaller than the size of
`partitionsToClean`.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -164,27 +164,32 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[]
statuses) {
List<HoodieFileGroup> fileGroups = buildFileGroups(statuses,
visibleCommitsAndCompactionTimeline, true);
long fgBuildTimeTakenMs = timer.endTimer();
timer.startTimer();
- // Group by partition for efficient updates for both InMemory and
DiskBased structures.
-
fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).forEach((partition,
value) -> {
- if (!isPartitionAvailableInStore(partition)) {
- if (bootstrapIndex.useIndex()) {
- try (BootstrapIndex.IndexReader reader =
bootstrapIndex.createReader()) {
- LOG.info("Bootstrap Index available for partition " + partition);
- List<BootstrapFileMapping> sourceFileMappings =
- reader.getSourceFileMappingForPartition(partition);
- addBootstrapBaseFileMapping(sourceFileMappings.stream()
- .map(s -> new BootstrapBaseFileMapping(new
HoodieFileGroupId(s.getPartitionPath(),
- s.getFileId()), s.getBootstrapFileStatus())));
+ writeLock.lock();
+ try {
Review Comment:
Why we still need these locks?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java:
##########
@@ -222,6 +222,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String
partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups,
secondaryView::getAllFileGroups);
}
+ @Override
+ public boolean isPartitionAvailableInStoreForTest(String partitionPath) {
+ return execute(partitionPath,
preferredView::isPartitionAvailableInStoreForTest,
secondaryView::isPartitionAvailableInStoreForTest);
+ }
Review Comment:
We do not add APIs just for tesing purpose.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -305,8 +305,13 @@ void
removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieIn
*/
@Override
Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partition) {
- final List<HoodieFileGroup> fileGroups = new
ArrayList<>(partitionToFileGroupsMap.get(partition));
- return fileGroups.stream();
+ readLock.lock();
+ try {
+ final List<HoodieFileGroup> fileGroups = new
ArrayList<>(partitionToFileGroupsMap.get(partition));
+ return fileGroups.stream();
+ } finally {
+ readLock.unlock();
+ }
Review Comment:
Confused by all these lock changes.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -491,10 +491,13 @@ public Pair<Boolean, List<CleanFileInfo>>
getDeletePaths(String partitionPath, O
Pair<Boolean, List<CleanFileInfo>> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath,
earliestCommitToRetain);
+
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
+
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
deletePaths = getFilesToCleanKeepingLatestHours(partitionPath,
earliestCommitToRetain);
+
fileSystemView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
Review Comment:
Just move the logic to line 504.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]