This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/rfc-15 by this push:
     new f7f8c33  [HUDI-1469] Faster initialization of metadata table using 
parallelized listing. (#2343)
f7f8c33 is described below

commit f7f8c33f804b1bb2cfecf3ac18edc9eb584c679e
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Dec 21 10:42:32 2020 -0800

    [HUDI-1469] Faster initialization of metadata table using parallelized 
listing. (#2343)
    
    * [HUDI-1469] Faster initialization of metadata table using parallelized 
listing which finds partitions and files in a single scan.
    * MINOR fixes
    
    Co-authored-by: Vinoth Chandar <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 12 +++
 .../metadata/HoodieBackedTableMetadataWriter.java  | 94 ++++++++++++++--------
 2 files changed, 71 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 484880c..bf82953 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -73,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = 
"hoodie.bulkinsert.user.defined.partitioner.class";
   public static final String UPSERT_PARALLELISM = 
"hoodie.upsert.shuffle.parallelism";
   public static final String DELETE_PARALLELISM = 
"hoodie.delete.shuffle.parallelism";
+  public static final String FILE_LISTING_PARALLELISM = 
"hoodie.file.listing.parallelism";
   public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
   public static final String ROLLBACK_PARALLELISM = 
"hoodie.rollback.parallelism";
   public static final String WRITE_BUFFER_LIMIT_BYTES = 
"hoodie.write.buffer.limit.bytes";
@@ -213,6 +214,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 
1);
   }
 
+  public int getFileListingParallelism() {
+    return 
Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
+  }
+
   public int getRollbackParallelism() {
     return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
   }
@@ -870,6 +875,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder withFileListingParallelism(int parallelism) {
+      props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
+      return this;
+    }
+
     public Builder withParallelism(int insertShuffleParallelism, int 
upsertShuffleParallelism) {
       props.setProperty(INSERT_PARALLELISM, 
String.valueOf(insertShuffleParallelism));
       props.setProperty(UPSERT_PARALLELISM, 
String.valueOf(upsertShuffleParallelism));
@@ -1024,6 +1034,8 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
           DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), 
UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), 
DELETE_PARALLELISM, DEFAULT_PARALLELISM);
+      setDefaultOnCondition(props, 
!props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
+          DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), 
ROLLBACK_PARALLELISM,
           DEFAULT_ROLLBACK_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), 
ROLLBACK_USING_MARKERS,
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b560428..f89a198 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -52,6 +52,7 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -67,7 +68,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -82,8 +82,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -196,6 +194,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
         .withParallelism(parallelism, parallelism)
         .withDeleteParallelism(parallelism)
         .withRollbackParallelism(parallelism)
+        .withFileListingParallelism(writeConfig.getFileListingParallelism())
         .withFinalizeWriteParallelism(parallelism);
 
     if (writeConfig.isMetricsOn()) {
@@ -311,43 +310,17 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
     initTableMetadata();
 
     // List all partitions in the basePath of the containing dataset
-    FileSystem fs = datasetMetaClient.getFs();
-    FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new 
FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
-        datasetWriteConfig.shouldAssumeDatePartitioning());
-    List<String> partitions = 
fileSystemBackedTableMetadata.getAllPartitionPaths();
-    LOG.info("Initializing metadata table by using file listings in " + 
partitions.size() + " partitions");
-
-    // List all partitions in parallel and collect the files in them
-    int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 
1; // +1 to prevent 0 parallelism
-    JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
-        .mapToPair(partition -> {
-          FileStatus[] statuses = 
fileSystemBackedTableMetadata.getAllFilesInPartition(new 
Path(datasetWriteConfig.getBasePath(), partition));
-          return new Tuple2<>(partition, statuses);
-        });
-
-    // Collect the list of partitions and file lists
-    List<Tuple2<String, FileStatus[]>> partitionFileList = 
partitionFileListRDD.collect();
+    LOG.info("Initializing metadata table by using file listings in " + 
datasetWriteConfig.getBasePath());
+    Map<String, List<FileStatus>> partitionToFileStatus = 
getPartitionsToFilesMapping(jsc, datasetMetaClient);
 
     // Create a HoodieCommitMetadata with writeStats for all discovered files
     int[] stats = {0};
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
 
-    partitionFileList.forEach(t -> {
-      final String partition = t._1;
-      try {
-        if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
-          return;
-        }
-      } catch (IOException e) {
-        throw new HoodieMetadataException("Failed to check partition " + 
partition, e);
-      }
-
+    partitionToFileStatus.forEach((partition, statuses) -> {
       // Filter the statuses to only include files which were created before 
or on createInstantTime
-      Arrays.stream(t._2).filter(status -> {
+      statuses.stream().filter(status -> {
         String filename = status.getPath().getName();
-        if 
(filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
-          return false;
-        }
         if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), 
HoodieTimeline.GREATER_THAN,
             createInstantTime)) {
           return false;
@@ -370,11 +343,60 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       }
     });
 
-    LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
+    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
     update(commitMetadata, createInstantTime);
   }
 
   /**
+   * Function to find hoodie partitions and list files in them in parallel.
+   *
+   * @param jsc
+   * @param datasetMetaClient
+   * @return Map of partition names to a list of FileStatus for all the files 
in the partition
+   */
+  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) {
+
+    List<Path> pathsToList = new LinkedList<>();
+    pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+
+    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
+    SerializableConfiguration conf = new 
SerializableConfiguration(datasetMetaClient.getHadoopConf());
+
+    while (!pathsToList.isEmpty()) {
+      int listingParallelism = Math.min(fileListingParallelism, 
pathsToList.size());
+      // List all directories in parallel
+      List<Pair<Path, FileStatus[]>> dirToFileListing = 
jsc.parallelize(pathsToList, listingParallelism)
+            .map(path -> {
+              FileSystem fs = path.getFileSystem(conf.get());
+              return Pair.of(path, fs.listStatus(path));
+            }).collect();
+      pathsToList.clear();
+
+      // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
+      // the results.
+      dirToFileListing.forEach(p -> {
+        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
+            .filter(fs -> 
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+            .collect(Collectors.toList());
+
+        if (p.getRight().length > filesInDir.size()) {
+          // Is a partition. Add all data files to result.
+          partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
+        } else {
+          // Add sub-dirs to the queue
+          pathsToList.addAll(Arrays.stream(p.getRight())
+              .filter(fs -> fs.isDirectory() && 
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+              .map(fs -> fs.getPath())
+              .collect(Collectors.toList()));
+        }
+      });
+    }
+
+    return partitionToFileStatus;
+  }
+
+  /**
    * Sync the Metadata Table from the instants created on the dataset.
    *
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
@@ -454,7 +476,9 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       writeStats.forEach(hoodieWriteStat -> {
         String pathWithPartition = hoodieWriteStat.getPath();
         if (pathWithPartition == null) {
-          throw new HoodieMetadataException("Unable to find path in write stat 
to update metadata table " + hoodieWriteStat);
+          // Empty partition
+          LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
+          return;
         }
 
         int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : 
partition.length() + 1;

Reply via email to