This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b0440  [HUDI-2634] Improved the metadata table bootstrap for very 
large tables. (#3873)
77b0440 is described below

commit 77b0440eb4fdbb01d41aa27b4fa2a8d45a20b293
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Nov 10 19:37:48 2021 -0800

    [HUDI-2634] Improved the metadata table bootstrap for very large tables. 
(#3873)
    
    * [HUDI-2634] Improved the metadata table bootstrap for very large tables.
    
    Following improvements are implemented:
    1. Memory overhead reduction:
      - Existing code caches FileStatus for each file in memory.
      - Created a new class DirectoryInfo which is used to cache a director's 
file list with parts of the FileStatus (only filename and file len). This 
reduces the memory requirements.
    
    2. Improved parallelism:
      - Existing code collects all the listing to the Driver and then creates 
HoodieRecord on the Driver.
      - This takes a long time for large tables (11million HoodieRecords to be 
created)
      - Created a new function in SparkRDDWriteClient specifically for 
bootstrap commit. In it, the HoodieRecord creation is parallelized across 
executors so it completes fast.
    
    3. Fixed setting to limit the number of parallel listings:
      - Existing code had a bug wherein 1500 executors were hardcoded to 
perform listing. This leads to exception due to limit in the spark's result 
memory.
      - Corrected the use of the config.
    
    Result:
    Dataset has 1299 partitions and 12Million files.
    file listing time=1.5mins
    HoodieRecord creation time=13seconds
    deltacommit duration=2.6mins
    
    Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 199 ++++++++++++++-------
 .../FlinkHoodieBackedTableMetadataWriter.java      |   4 +-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |   5 +
 .../SparkHoodieBackedTableMetadataWriter.java      |  11 +-
 .../org/apache/hudi/common/data/HoodieData.java    |   7 +
 .../org/apache/hudi/common/data/HoodieList.java    |   8 +
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  20 ++-
 .../apache/hudi/common/model/HoodieFileFormat.java |  10 ++
 8 files changed, 188 insertions(+), 76 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index bf7bd8f..71d6193 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
@@ -39,7 +40,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -51,7 +51,6 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -68,6 +67,8 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -175,7 +176,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
             .build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
         .withAutoCommit(true)
         .withAvroSchemaValidate(true)
         .withEmbeddedTimelineServerEnabled(false)
@@ -400,92 +401,68 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
     // List all partitions in the basePath of the containing dataset
     LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    Map<String, List<FileStatus>> partitionToFileStatus = 
getPartitionsToFilesMapping(dataMetaClient);
-
-    // Create a HoodieCommitMetadata with writeStats for all discovered files
-    int[] stats = {0};
-    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-
-    partitionToFileStatus.forEach((partition, statuses) -> {
-      // Filter the statuses to only include files which were created before 
or on createInstantTime
-      statuses.stream().filter(status -> {
-        String filename = status.getPath().getName();
-        return 
!HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), 
HoodieTimeline.GREATER_THAN,
-            createInstantTime);
-      }).forEach(status -> {
-        HoodieWriteStat writeStat = new HoodieWriteStat();
-        writeStat.setPath((partition.isEmpty() ? "" : partition + 
Path.SEPARATOR) + status.getPath().getName());
-        writeStat.setPartitionPath(partition);
-        writeStat.setTotalWriteBytes(status.getLen());
-        commitMetadata.addWriteStat(partition, writeStat);
-        stats[0] += 1;
-      });
-
-      // If the partition has no files then create a writeStat with no file 
path
-      if (commitMetadata.getWriteStats(partition) == null) {
-        HoodieWriteStat writeStat = new HoodieWriteStat();
-        writeStat.setPartitionPath(partition);
-        commitMetadata.addWriteStat(partition, writeStat);
-      }
-    });
+    List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
 
-    LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " 
+ stats[0] + " files to metadata");
-    update(commitMetadata, createInstantTime, false);
+    // During bootstrap, the list of files to be committed can be huge. So 
creating a HoodieCommitMetadata out of these
+    // large number of files and calling the existing 
update(HoodieCommitMetadata) function does not scale well.
+    // Hence, we have a special commit just for the bootstrap scenario.
+    bootstrapCommit(dirInfoList, createInstantTime);
     return true;
   }
 
   /**
    * Function to find hoodie partitions and list files in them in parallel.
    *
-   * @param dataMetaClient
+   * @param datasetMetaClient data set meta client instance.
    * @return Map of partition names to a list of FileStatus for all the files 
in the partition
    */
-  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient 
datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
-    SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
+    SerializableConfiguration conf = new 
SerializableConfiguration(datasetMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = datasetMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, 
pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = 
engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> processedDirectories = 
engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {
         FileSystem fs = path.getFileSystem(conf.get());
-        return Pair.of(path, fs.listStatus(path));
-      }, listingParallelism);
-      pathsToList.clear();
+        String relativeDirPath = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), path);
+        return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+      }, numDirsToList);
+
+      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
 
       // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
       // the results.
-      dirToFileListing.forEach(p -> {
-        if (!dirFilterRegex.isEmpty() && 
p.getLeft().getName().matches(dirFilterRegex)) {
-          LOG.info("Ignoring directory " + p.getLeft() + " which matches the 
filter regex " + dirFilterRegex);
-          return;
+      for (DirectoryInfo dirInfo : processedDirectories) {
+        if (!dirFilterRegex.isEmpty()) {
+          final String relativePath = dirInfo.getRelativePath();
+          if (!relativePath.isEmpty()) {
+            Path partitionPath = new Path(datasetBasePath, relativePath);
+            if (partitionPath.getName().matches(dirFilterRegex)) {
+              LOG.info("Ignoring directory " + partitionPath + " which matches 
the filter regex " + dirFilterRegex);
+              continue;
+            }
+          }
         }
 
-        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
-            .filter(fs -> 
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
-            .collect(Collectors.toList());
-
-        if (p.getRight().length > filesInDir.size()) {
-          String partitionName = FSUtils.getRelativePartitionPath(new 
Path(dataMetaClient.getBasePath()), p.getLeft());
-          // deal with Non-partition table, we should exclude .hoodie
-          partitionToFileStatus.put(partitionName, filesInDir.stream()
-              .filter(f -> 
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+        if (dirInfo.isHoodiePartition()) {
+          // Add to result
+          partitionsToBootstrap.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fs -> fs.isDirectory() && 
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fs -> fs.getPath())
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubDirectories());
         }
-      });
+      }
     }
 
-    return partitionToFileStatus;
+    return partitionsToBootstrap;
   }
 
   /**
@@ -549,7 +526,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   private <T> void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
     if (enabled && metadata != null) {
       List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(records, MetadataPartitionType.FILES.partitionPath(), 
instantTime, canTriggerTableService);
+      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, 
canTriggerTableService);
     }
   }
 
@@ -611,7 +588,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
       List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(),
 rollbackMetadata, instantTime,
           metadata.getSyncedInstantTime(), wasSynced);
-      commit(records, MetadataPartitionType.FILES.partitionPath(), 
instantTime, false);
+      commit(engineContext.parallelize(records, 1), 
MetadataPartitionType.FILES.partitionPath(), instantTime, false);
     }
   }
 
@@ -624,12 +601,12 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
   /**
    * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
-   *  @param records The list of records to be written.
+   *  @param records The HoodieData of records to be written.
    * @param partitionName The partition to which the records are to be written.
    * @param instantTime The timestamp to use for the deltacommit.
    * @param canTriggerTableService true if table services can be scheduled and 
executed. false otherwise.
    */
-  protected abstract void commit(List<HoodieRecord> records, String 
partitionName, String instantTime, boolean canTriggerTableService);
+  protected abstract void commit(HoodieData<HoodieRecord> records, String 
partitionName, String instantTime, boolean canTriggerTableService);
 
   /**
    *  Perform a compaction on the Metadata Table.
@@ -668,4 +645,96 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // metadata table.
     writeClient.clean(instantTime + "002");
   }
+
+  /**
+   * This is invoked to bootstrap metadata table for a dataset. Bootstrap 
Commit has special handling mechanism due to its scale compared to
+   * other regular commits.
+   *
+   */
+  protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String 
createInstantTime) {
+    List<String> partitions = partitionInfoList.stream().map(p -> 
p.getRelativePath()).collect(Collectors.toList());
+    final int totalFiles = partitionInfoList.stream().mapToInt(p -> 
p.getTotalFiles()).sum();
+
+    // Record which saves the list of all partitions
+    HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
+    if (partitions.isEmpty()) {
+      // in case of boostrapping of a fresh table, there won't be any 
partitions, but we need to make a boostrap commit
+      
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 
1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
+      return;
+    }
+    HoodieData<HoodieRecord> partitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
+    if (!partitionInfoList.isEmpty()) {
+      HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
+        // Record which saves files within a partition
+        return HoodieMetadataPayload.createPartitionFilesRecord(
+            partitionInfo.getRelativePath(), 
Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
+      });
+      partitionRecords = partitionRecords.union(fileListRecords);
+    }
+
+    LOG.info("Committing " + partitions.size() + " partitions and " + 
totalFiles + " files to metadata");
+    ValidationUtils.checkState(partitionRecords.count() == (partitions.size() 
+ 1));
+    commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(), 
createInstantTime, false);
+  }
+
+  /**
+   * A class which represents a directory and the files and directories inside 
it.
+   *
+   * A {@code PartitionFileInfo} object saves the name of the partition and 
various properties requires of each file
+   * required for bootstrapping the metadata table. Saving limited properties 
reduces the total memory footprint when
+   * a very large number of files are present in the dataset being 
bootstrapped.
+   */
+  static class DirectoryInfo implements Serializable {
+    // Relative path of the directory (relative to the base directory)
+    private final String relativePath;
+    // Map of filenames within this partition to their respective sizes
+    private HashMap<String, Long> filenameToSizeMap;
+    // List of directories within this partition
+    private final List<Path> subDirectories = new ArrayList<>();
+    // Is this a hoodie partition
+    private boolean isHoodiePartition = false;
+
+    public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
+      this.relativePath = relativePath;
+
+      // Pre-allocate with the maximum length possible
+      filenameToSizeMap = new HashMap<>(fileStatus.length);
+
+      for (FileStatus status : fileStatus) {
+        if (status.isDirectory()) {
+          // Ignore .hoodie directory as there cannot be any partitions inside 
it
+          if 
(!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+            this.subDirectories.add(status.getPath());
+          }
+        } else if 
(status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
 {
+          // Presence of partition meta file implies this is a HUDI partition
+          this.isHoodiePartition = true;
+        } else if (FSUtils.isDataFile(status.getPath())) {
+          // Regular HUDI data file (base file or log file)
+          filenameToSizeMap.put(status.getPath().getName(), status.getLen());
+        }
+      }
+    }
+
+    String getRelativePath() {
+      return relativePath;
+    }
+
+    int getTotalFiles() {
+      return filenameToSizeMap.size();
+    }
+
+    boolean isHoodiePartition() {
+      return isHoodiePartition;
+    }
+
+    List<Path> getSubDirectories() {
+      return subDirectories;
+    }
+
+    // Returns a map of filenames mapped to their lengths
+    Map<String, Long> getFileNameToSizeMap() {
+      return filenameToSizeMap;
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 96cdcae..174fc1e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
@@ -91,8 +92,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   @Override
-  protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime, boolean canTriggerTableService) {
+  protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String 
partitionName, String instantTime, boolean canTriggerTableService) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
+    List<HoodieRecord> records = (List<HoodieRecord>) hoodieDataRecords.get();
     List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
 
     try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index ceaee47..d4eb259 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -131,6 +131,11 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
   }
 
   @Override
+  public HoodieData<T> union(HoodieData<T> other) {
+    return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
+  }
+
+  @Override
   public List<T> collectAsList() {
     return rddData.collect();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ad6f9d9..0673ca9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
@@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.util.List;
@@ -121,9 +121,9 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     }
   }
 
-  @Override
-  protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime, boolean canTriggerTableService) {
+  protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String 
partitionName, String instantTime, boolean canTriggerTableService) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
+    JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) 
hoodieDataRecords.get();
     JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
@@ -166,12 +166,11 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
    *
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
+  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
     List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    return jsc.parallelize(records, 1).map(r -> {
+    return recordsRDD.map(r -> {
       FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
       r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
       return r;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 7ea7e0d..093fd43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -98,6 +98,13 @@ public abstract class HoodieData<T> implements Serializable {
   public abstract HoodieData<T> distinct();
 
   /**
+   * Unions this {@link HoodieData} with other {@link HoodieData}.
+   * @param other {@link HoodieData} of interest.
+   * @return the union of two as as instance of {@link HoodieData}.
+   */
+  public abstract HoodieData<T> union(HoodieData<T> other);
+
+  /**
    * @return collected results in {@link List<T>}.
    */
   public abstract List<T> collectAsList();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
index 6c23fdf..9441619 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
@@ -133,6 +133,14 @@ public class HoodieList<T> extends HoodieData<T> {
   }
 
   @Override
+  public HoodieData<T> union(HoodieData<T> other) {
+    List<T> unionResult = new ArrayList<>();
+    unionResult.addAll(listData);
+    unionResult.addAll(other.collectAsList());
+    return HoodieList.of(unionResult);
+  }
+
+  @Override
   public List<T> collectAsList() {
     return listData;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 8273ca7..dc4df23 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -239,7 +239,7 @@ public class FSUtils {
   /**
    * Recursively processes all files in the base-path. If excludeMetaFolder is 
set, the meta-folder and all its subdirs
    * are skipped
-   * 
+   *
    * @param fs File System
    * @param basePathStr Base-Path
    * @param consumer Callback for processing
@@ -431,18 +431,30 @@ public class FSUtils {
 
   public static String makeLogFileName(String fileId, String logFileExtension, 
String baseCommitTime, int version,
       String writeToken) {
-    String suffix =
-        (writeToken == null) ? String.format("%s_%s%s.%d", fileId, 
baseCommitTime, logFileExtension, version)
-            : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, 
logFileExtension, version, writeToken);
+    String suffix = (writeToken == null)
+        ? String.format("%s_%s%s.%d", fileId, baseCommitTime, 
logFileExtension, version)
+        : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, 
logFileExtension, version, writeToken);
     return LOG_FILE_PREFIX + suffix;
   }
 
+  public static boolean isBaseFile(Path path) {
+    String extension = getFileExtension(path.getName());
+    return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
+  }
+
   public static boolean isLogFile(Path logPath) {
     Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
     return matcher.find() && logPath.getName().contains(".log");
   }
 
   /**
+   * Returns true if the given path is a Base file or a Log file.
+   */
+  public static boolean isDataFile(Path path) {
+    return isBaseFile(path) || isLogFile(path);
+  }
+
+  /**
    * Get the names of all the base and log files in the given partition path.
    */
   public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path 
partitionPath) throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index f7fdcd0..3263910 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -18,6 +18,11 @@
 
 package org.apache.hudi.common.model;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Hoodie file format.
  */
@@ -27,6 +32,11 @@ public enum HoodieFileFormat {
   HFILE(".hfile"),
   ORC(".orc");
 
+  public static final Set<String> BASE_FILE_EXTENSIONS = 
Arrays.stream(HoodieFileFormat.values())
+      .map(HoodieFileFormat::getFileExtension)
+      .filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension()))
+      .collect(Collectors.toCollection(HashSet::new));
+
   private final String extension;
 
   HoodieFileFormat(String extension) {

Reply via email to