This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new acecf304254 [MINOR] Avoid listing files for empty tables (#11155)
acecf304254 is described below
commit acecf3042549583de31cad176fb500c55bb61700
Author: Tim Brown <[email protected]>
AuthorDate: Fri May 31 17:30:14 2024 -0500
[MINOR] Avoid listing files for empty tables (#11155)
---
.../hudi/metadata/HoodieBackedTableMetadataWriter.java | 17 ++++++++++++-----
.../hudi/table/action/commit/UpsertPartitioner.java | 18 +++++++++++-------
2 files changed, 23 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 831c2e1882c..604399b7382 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -83,12 +83,14 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -761,7 +763,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromFilesystem(String
initializationTime, Set<String> pendingDataInstants) {
- List<StoragePath> pathsToList = new LinkedList<>();
+ if (dataMetaClient.getActiveTimeline().countInstants() == 0) {
+ return Collections.emptyList();
+ }
+ Queue<StoragePath> pathsToList = new ArrayDeque<>();
pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
@@ -773,16 +778,18 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
while (!pathsToList.isEmpty()) {
// In each round we will list a section of directories
int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
+ List<StoragePath> pathsToProcess = new ArrayList<>(numDirsToList);
+ for (int i = 0; i < numDirsToList; i++) {
+ pathsToProcess.add(pathsToList.poll());
+ }
// List all directories in parallel
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " +
numDirsToList + " partitions from filesystem");
- List<DirectoryInfo> processedDirectories =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
+ List<DirectoryInfo> processedDirectories =
engineContext.map(pathsToProcess, path -> {
HoodieStorage storage = new HoodieHadoopStorage(path, storageConf);
String relativeDirPath =
FSUtils.getRelativePartitionPath(storageBasePath, path);
return new DirectoryInfo(relativeDirPath,
storage.listDirectEntries(path), initializationTime, pendingDataInstants);
}, numDirsToList);
- pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
-
// If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
// the results.
for (DirectoryInfo dirInfo : processedDirectories) {
@@ -815,10 +822,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromMDT(String
initializationTime, Set<String> pendingDataInstants) throws IOException {
- List<DirectoryInfo> dirinfoList = new LinkedList<>();
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() +
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
Map<String, List<StoragePathInfo>> partitionFileMap =
metadata.getAllFilesInPartitions(allPartitionPaths);
+ List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
for (Map.Entry<String, List<StoragePathInfo>> entry :
partitionFileMap.entrySet()) {
dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(),
initializationTime, pendingDataInstants));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 09904cd290e..ea125614170 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -187,12 +187,12 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
List<SmallFile> smallFiles =
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath,
Collections.emptySet()),
- partitionSmallFilesMap.getOrDefault(partitionPath, new
ArrayList<>()));
+ partitionSmallFilesMap.getOrDefault(partitionPath,
Collections.emptyList()));
this.smallFiles.addAll(smallFiles);
- LOG.info("For partitionPath : " + partitionPath + " Total Small Files
=> " + smallFiles.size());
- LOG.debug("For partitionPath : " + partitionPath + " Small Files => "
+ smallFiles);
+ LOG.info("For partitionPath : {} Total Small Files => {}",
partitionPath, smallFiles.size());
+ LOG.debug("For partitionPath : {} Small Files => {}", partitionPath,
smallFiles);
long totalUnassignedInserts = pStat.getNumInserts();
List<Integer> bucketNumbers = new ArrayList<>();
@@ -271,13 +271,17 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
}
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String>
partitionPaths, HoodieEngineContext context) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
-
if (config.getParquetSmallFileLimit() <= 0) {
- return partitionSmallFilesMap;
+ return Collections.emptyMap();
+ }
+
+ if
(table.getMetaClient().getCommitsTimeline().filterCompletedInstants().countInstants()
== 0) {
+ return Collections.emptyMap();
}
+ JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small
files from partitions: " + config.getTableName());
JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths,
partitionPaths.size());