This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new acecf304254 [MINOR] Avoid listing files for empty tables (#11155)
acecf304254 is described below

commit acecf3042549583de31cad176fb500c55bb61700
Author: Tim Brown <[email protected]>
AuthorDate: Fri May 31 17:30:14 2024 -0500

    [MINOR] Avoid listing files for empty tables (#11155)
---
 .../hudi/metadata/HoodieBackedTableMetadataWriter.java | 17 ++++++++++++-----
 .../hudi/table/action/commit/UpsertPartitioner.java    | 18 +++++++++++-------
 2 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 831c2e1882c..604399b7382 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -83,12 +83,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -761,7 +763,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
   private List<DirectoryInfo> listAllPartitionsFromFilesystem(String 
initializationTime, Set<String> pendingDataInstants) {
-    List<StoragePath> pathsToList = new LinkedList<>();
+    if (dataMetaClient.getActiveTimeline().countInstants() == 0) {
+      return Collections.emptyList();
+    }
+    Queue<StoragePath> pathsToList = new ArrayDeque<>();
     pathsToList.add(new StoragePath(dataWriteConfig.getBasePath()));
 
     List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
@@ -773,16 +778,18 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     while (!pathsToList.isEmpty()) {
       // In each round we will list a section of directories
       int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
+      List<StoragePath> pathsToProcess = new ArrayList<>(numDirsToList);
+      for (int i = 0; i < numDirsToList; i++) {
+        pathsToProcess.add(pathsToList.poll());
+      }
       // List all directories in parallel
       engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + 
numDirsToList + " partitions from filesystem");
-      List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
+      List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToProcess, path -> {
         HoodieStorage storage = new HoodieHadoopStorage(path, storageConf);
         String relativeDirPath = 
FSUtils.getRelativePartitionPath(storageBasePath, path);
         return new DirectoryInfo(relativeDirPath, 
storage.listDirectEntries(path), initializationTime, pendingDataInstants);
       }, numDirsToList);
 
-      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
-
       // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
       // the results.
       for (DirectoryInfo dirInfo : processedDirectories) {
@@ -815,10 +822,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * @return List consisting of {@code DirectoryInfo} for each partition found.
    */
   private List<DirectoryInfo> listAllPartitionsFromMDT(String 
initializationTime, Set<String> pendingDataInstants) throws IOException {
-    List<DirectoryInfo> dirinfoList = new LinkedList<>();
     List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
         .map(partitionPath -> dataWriteConfig.getBasePath() + 
StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
     Map<String, List<StoragePathInfo>> partitionFileMap = 
metadata.getAllFilesInPartitions(allPartitionPaths);
+    List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
     for (Map.Entry<String, List<StoragePathInfo>> entry : 
partitionFileMap.entrySet()) {
       dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), 
initializationTime, pendingDataInstants));
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 09904cd290e..ea125614170 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -187,12 +187,12 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
 
         List<SmallFile> smallFiles =
             
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath,
 Collections.emptySet()),
-                partitionSmallFilesMap.getOrDefault(partitionPath, new 
ArrayList<>()));
+                partitionSmallFilesMap.getOrDefault(partitionPath, 
Collections.emptyList()));
 
         this.smallFiles.addAll(smallFiles);
 
-        LOG.info("For partitionPath : " + partitionPath + " Total Small Files 
=> " + smallFiles.size());
-        LOG.debug("For partitionPath : " + partitionPath + " Small Files => " 
+ smallFiles);
+        LOG.info("For partitionPath : {} Total Small Files => {}", 
partitionPath, smallFiles.size());
+        LOG.debug("For partitionPath : {} Small Files => {}", partitionPath, 
smallFiles);
 
         long totalUnassignedInserts = pStat.getNumInserts();
         List<Integer> bucketNumbers = new ArrayList<>();
@@ -271,13 +271,17 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
   }
 
   private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> 
partitionPaths, HoodieEngineContext context) {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
-
     if (config.getParquetSmallFileLimit() <= 0) {
-      return partitionSmallFilesMap;
+      return Collections.emptyMap();
+    }
+
+    if 
(table.getMetaClient().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0) {
+      return Collections.emptyMap();
     }
 
+    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+    Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+
     if (partitionPaths != null && partitionPaths.size() > 0) {
       context.setJobStatus(this.getClass().getSimpleName(), "Getting small 
files from partitions: " + config.getTableName());
       JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, 
partitionPaths.size());

Reply via email to