This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rfc-15 by this push:
new f7f8c33 [HUDI-1469] Faster initialization of metadata table using
parallelized listing. (#2343)
f7f8c33 is described below
commit f7f8c33f804b1bb2cfecf3ac18edc9eb584c679e
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Dec 21 10:42:32 2020 -0800
[HUDI-1469] Faster initialization of metadata table using parallelized
listing. (#2343)
* [HUDI-1469] Faster initialization of metadata table using parallelized
listing which finds partitions and files in a single scan.
* MINOR fixes
Co-authored-by: Vinoth Chandar <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 12 +++
.../metadata/HoodieBackedTableMetadataWriter.java | 94 ++++++++++++++--------
2 files changed, 71 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 484880c..bf82953 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -73,6 +73,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS =
"hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM =
"hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM =
"hoodie.delete.shuffle.parallelism";
+ public static final String FILE_LISTING_PARALLELISM =
"hoodie.file.listing.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM =
"hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES =
"hoodie.write.buffer.limit.bytes";
@@ -213,6 +214,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)),
1);
}
+ public int getFileListingParallelism() {
+ return
Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
+ }
+
public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
@@ -870,6 +875,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return this;
}
+ public Builder withFileListingParallelism(int parallelism) {
+ props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
+ return this;
+ }
+
public Builder withParallelism(int insertShuffleParallelism, int
upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM,
String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM,
String.valueOf(upsertShuffleParallelism));
@@ -1024,6 +1034,8 @@ public class HoodieWriteConfig extends
DefaultHoodieConfig {
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM),
UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM),
DELETE_PARALLELISM, DEFAULT_PARALLELISM);
+ setDefaultOnCondition(props,
!props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
+ DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM),
ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS),
ROLLBACK_USING_MARKERS,
diff --git
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index b560428..f89a198 100644
---
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -52,6 +52,7 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -67,7 +68,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -82,8 +82,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
import static
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -196,6 +194,7 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
+ .withFileListingParallelism(writeConfig.getFileListingParallelism())
.withFinalizeWriteParallelism(parallelism);
if (writeConfig.isMetricsOn()) {
@@ -311,43 +310,17 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
initTableMetadata();
// List all partitions in the basePath of the containing dataset
- FileSystem fs = datasetMetaClient.getFs();
- FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new
FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
- datasetWriteConfig.shouldAssumeDatePartitioning());
- List<String> partitions =
fileSystemBackedTableMetadata.getAllPartitionPaths();
- LOG.info("Initializing metadata table by using file listings in " +
partitions.size() + " partitions");
-
- // List all partitions in parallel and collect the files in them
- int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) +
1; // +1 to prevent 0 parallelism
- JavaPairRDD<String, FileStatus[]> partitionFileListRDD =
jsc.parallelize(partitions, parallelism)
- .mapToPair(partition -> {
- FileStatus[] statuses =
fileSystemBackedTableMetadata.getAllFilesInPartition(new
Path(datasetWriteConfig.getBasePath(), partition));
- return new Tuple2<>(partition, statuses);
- });
-
- // Collect the list of partitions and file lists
- List<Tuple2<String, FileStatus[]>> partitionFileList =
partitionFileListRDD.collect();
+ LOG.info("Initializing metadata table by using file listings in " +
datasetWriteConfig.getBasePath());
+ Map<String, List<FileStatus>> partitionToFileStatus =
getPartitionsToFilesMapping(jsc, datasetMetaClient);
// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- partitionFileList.forEach(t -> {
- final String partition = t._1;
- try {
- if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition +
Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
- return;
- }
- } catch (IOException e) {
- throw new HoodieMetadataException("Failed to check partition " +
partition, e);
- }
-
+ partitionToFileStatus.forEach((partition, statuses) -> {
// Filter the statuses to only include files which were created before
or on createInstantTime
- Arrays.stream(t._2).filter(status -> {
+ statuses.stream().filter(status -> {
String filename = status.getPath().getName();
- if
(filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
- return false;
- }
if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename),
HoodieTimeline.GREATER_THAN,
createInstantTime)) {
return false;
@@ -370,11 +343,60 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
/**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(JavaSparkContext jsc, HoodieTableMetaClient
datasetMetaClient) {
+
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
+ SerializableConfiguration conf = new
SerializableConfiguration(datasetMetaClient.getHadoopConf());
+
+ while (!pathsToList.isEmpty()) {
+ int listingParallelism = Math.min(fileListingParallelism,
pathsToList.size());
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
jsc.parallelize(pathsToList, listingParallelism)
+ .map(path -> {
+ FileSystem fs = path.getFileSystem(conf.get());
+ return Pair.of(path, fs.listStatus(path));
+ }).collect();
+ pathsToList.clear();
+
+ // If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
+ // the results.
+ dirToFileListing.forEach(p -> {
+ List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
+ .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+ .collect(Collectors.toList());
+
+ if (p.getRight().length > filesInDir.size()) {
+ // Is a partition. Add all data files to result.
+ partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
+ } else {
+ // Add sub-dirs to the queue
+ pathsToList.addAll(Arrays.stream(p.getRight())
+ .filter(fs -> fs.isDirectory() &&
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+ .map(fs -> fs.getPath())
+ .collect(Collectors.toList()));
+ }
+ });
+ }
+
+ return partitionToFileStatus;
+ }
+
+ /**
* Sync the Metadata Table from the instants created on the dataset.
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
@@ -454,7 +476,9 @@ public class HoodieBackedTableMetadataWriter implements
HoodieTableMetadataWrite
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
- throw new HoodieMetadataException("Unable to find path in write stat
to update metadata table " + hoodieWriteStat);
+ // Empty partition
+ LOG.warn("Unable to find path in write stat to update metadata table
" + hoodieWriteStat);
+ return;
}
int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 :
partition.length() + 1;