This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d201c8420a5e1d999565fca9af04cb52638cbb72 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed Sep 28 15:51:45 2022 -0700 [HUDI-4934] Revert batch clean files (#6813) * Revert "[HUDI-4792] Batch clean files to delete (#6580)" This reverts commit cbf9b83ca6d3dada14eea551a5bae25144ca0459. --- .../action/clean/CleanPlanActionExecutor.java | 11 +- .../hudi/table/action/clean/CleanPlanner.java | 215 ++++++++++----------- ...dieSparkCopyOnWriteTableArchiveWithReplace.java | 4 +- .../table/view/AbstractTableFileSystemView.java | 16 +- .../table/view/PriorityBasedFileSystemView.java | 5 - .../view/RemoteHoodieTableFileSystemView.java | 12 -- .../common/table/view/TableFileSystemView.java | 14 +- 7 files changed, 112 insertions(+), 165 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index bd7ec798ed..7f3b437178 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -42,7 +42,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -117,15 +116,9 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context - .parallelize(partitionsToClean, cleanerParallelism) - .mapPartitions(partitionIterator -> { - List<String> partitionList = new ArrayList<>(); - partitionIterator.forEachRemaining(partitionList::add); - Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList); - return cleanResult.entrySet().iterator(); - }, false).collectAsList() + .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index e85793d711..64e69b1d2a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -58,7 +58,6 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -223,10 +222,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestVersions(List<String> partitionPaths) { - LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained() + private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) { + LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); - Map<String, Pair<Boolean, List<CleanFileInfo>>> map = new HashMap<>(); + List<CleanFileInfo> deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) @@ -234,48 +233,43 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely // In other words, the file versions only apply to the active file groups. - List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); - for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) { - List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty())); - boolean toDeletePartition = false; - for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { - int keepVersions = config.getCleanerFileVersionsRetained(); - // do not cleanup slice required for pending compaction - Iterator<FileSlice> fileSliceIterator = - fileGroup.getAllFileSlices() - .filter(fs -> !isFileSliceNeededForPendingCompaction(fs)) - .iterator(); - if (isFileGroupInPendingCompaction(fileGroup)) { - // We have already saved the last version of file-groups for pending compaction Id - keepVersions--; - } + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); + boolean toDeletePartition = false; + List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + int keepVersions = config.getCleanerFileVersionsRetained(); + // do not cleanup slice required for pending compaction + Iterator<FileSlice> fileSliceIterator = + fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator(); + if (isFileGroupInPendingCompaction(fileGroup)) { + // We have already saved the last version of file-groups for pending compaction Id + keepVersions--; + } - while (fileSliceIterator.hasNext() && keepVersions > 0) { - // Skip this most recent version - fileSliceIterator.next(); - keepVersions--; - } - // Delete the remaining files - while (fileSliceIterator.hasNext()) { - FileSlice nextSlice = fileSliceIterator.next(); - Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile(); - if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { - // do not clean up a savepoint data file - continue; - } - deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); - } + while (fileSliceIterator.hasNext() && keepVersions > 0) { + // Skip this most recent version + fileSliceIterator.next(); + keepVersions--; } - // if there are no valid file groups for the partition, mark it to be deleted - if (partitionFileGroupList.getValue().isEmpty()) { - toDeletePartition = true; + // Delete the remaining files + while (fileSliceIterator.hasNext()) { + FileSlice nextSlice = fileSliceIterator.next(); + Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile(); + if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { + // do not clean up a savepoint data file + continue; + } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } - map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } - return map; + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; + } + return Pair.of(toDeletePartition, deletePaths); } - private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPath) { + private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); } @@ -296,9 +290,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ - private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) { - LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. "); - Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanFileInfoPerPartitionMap = new HashMap<>(); + private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { + LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); + List<CleanFileInfo> deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream() @@ -310,79 +304,75 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser if (commitTimeline.countInstants() > commitsRetained) { Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain(); HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); + // all replaced file groups before earliestCommitToRetain are eligible to clean + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption)); // add active files - List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); - for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) { - List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); - // all replaced file groups before earliestCommitToRetain are eligible to clean - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); - for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { - List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); - - if (fileSliceList.isEmpty()) { + List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); + + if (fileSliceList.isEmpty()) { + continue; + } + + String lastVersion = fileSliceList.get(0).getBaseInstantTime(); + String lastVersionBeforeEarliestCommitToRetain = + getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); + + // Ensure there are more than 1 version of the file (we only clean old files from updates) + // i.e always spare the last commit. + for (FileSlice aSlice : fileSliceList) { + Option<HoodieBaseFile> aFile = aSlice.getBaseFile(); + String fileCommitTime = aSlice.getBaseInstantTime(); + if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { + // do not clean up a savepoint data file continue; } - String lastVersion = fileSliceList.get(0).getBaseInstantTime(); - String lastVersionBeforeEarliestCommitToRetain = - getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); - - // Ensure there are more than 1 version of the file (we only clean old files from updates) - // i.e always spare the last commit. - for (FileSlice aSlice : fileSliceList) { - Option<HoodieBaseFile> aFile = aSlice.getBaseFile(); - String fileCommitTime = aSlice.getBaseInstantTime(); - if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { - // do not clean up a savepoint data file + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + // Dont delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still + // uses this file. + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + // move on to the next file continue; } - - if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - // Dont delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file - continue; - } - } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - // This block corresponds to KEEP_LATEST_BY_HOURS policy - // Do not delete the latest commit. - if (fileCommitTime.equals(lastVersion)) { - // move on to the next file - continue; - } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + // This block corresponds to KEEP_LATEST_BY_HOURS policy + // Do not delete the latest commit. + if (fileCommitTime.equals(lastVersion)) { + // move on to the next file + continue; } + } - // Always keep the last commit - if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline - .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { - // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); - if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); - } - }); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - deletePaths.addAll( - aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + // Always keep the last commit + if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline + .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { + // this is a commit, that should be cleaned. + aFile.ifPresent(hoodieDataFile -> { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); + if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); } + }); + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // 1. If merge on read, then clean the log files for the commits as well; + // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. + deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } - // if there are no valid file groups for the partition, mark it to be deleted - if (partitionFileGroupList.getValue().isEmpty()) { - toDeletePartition = true; - } - cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); + } + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; } } - return cleanFileInfoPerPartitionMap; + return Pair.of(toDeletePartition, deletePaths); } /** @@ -390,11 +380,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained. * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5. - * * @param partitionPath partition path to check * @return list of files to clean */ - private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestHours(List<String> partitionPath) { + private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); } @@ -448,23 +437,21 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public Map<String, Pair<Boolean, List<CleanFileInfo>>> getDeletePaths(List<String> partitionPaths) { + public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - Map<String, Pair<Boolean, List<CleanFileInfo>>> deletePaths; + Pair<Boolean, List<CleanFileInfo>> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { - deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestHours(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - for (String partitionPath : deletePaths.keySet()) { - LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath); - if (deletePaths.get(partitionPath).getLeft()) { - LOG.info("Partition " + partitionPath + " to be deleted"); - } + LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath); + if (deletePaths.getKey()) { + LOG.info("Partition " + partitionPath + " to be deleted"); } return deletePaths; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index 967e313f4e..baff4ebac8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -57,7 +57,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie HoodieWriteConfig writeConfig = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) { @@ -81,7 +81,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit + // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 89a184bf49..8cfd92d01f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -116,7 +116,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV /** * Refresh commits timeline. - * + * * @param visibleActiveTimeline Visible Active Timeline */ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { @@ -750,20 +750,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg)); } - @Override - public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) { - return getAllFileGroupsIncludingReplaced(partitionPaths) - .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList()))); - } - - private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) { - List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>(); - for (String partitionStr : partitionStrList) { - fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList()))); - } - return fileGroupPerPartitionList.stream(); - } - private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) { try { readLock.lock(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 9006bd45cb..ff44c7cef0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -204,11 +204,6 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } - @Override - public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) { - return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); - } - @Override public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 5e52767fe2..bd18ba22a2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -51,11 +51,9 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -379,16 +377,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } } - @Override - public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) { - ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath); - fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList()))); - } - return fileGroupPerPartitionList.stream(); - } - @Override public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 9c83c8f19c..c32e2cabb1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -109,18 +109,18 @@ public interface TableFileSystemView { /** * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime. * - * @param partitionPath Partition path - * @param maxCommitTime Max Instant Time + * @param partitionPath Partition path + * @param maxCommitTime Max Instant Time * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction */ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, - boolean includeFileSlicesInPendingCompaction); + boolean includeFileSlicesInPendingCompaction); /** * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the * file-slice before and after compaction request instant is merged and returned. - * - * @param partitionPath Partition Path + * + * @param partitionPath Partition Path * @param maxInstantTime Max Instant Time * @return */ @@ -149,12 +149,10 @@ public interface TableFileSystemView { */ Stream<HoodieFileGroup> getAllFileGroups(String partitionPath); - Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths); - /** * Return Pending Compaction Operations. * - * @return Pair<Pair < InstantTime, CompactionOperation>> + * @return Pair<Pair<InstantTime,CompactionOperation>> */ Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();
