This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77b0440 [HUDI-2634] Improved the metadata table bootstrap for very
large tables. (#3873)
77b0440 is described below
commit 77b0440eb4fdbb01d41aa27b4fa2a8d45a20b293
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Nov 10 19:37:48 2021 -0800
[HUDI-2634] Improved the metadata table bootstrap for very large tables.
(#3873)
* [HUDI-2634] Improved the metadata table bootstrap for very large tables.
Following improvements are implemented:
1. Memory overhead reduction:
- Existing code caches FileStatus for each file in memory.
- Created a new class DirectoryInfo which is used to cache a director's
file list with parts of the FileStatus (only filename and file len). This
reduces the memory requirements.
2. Improved parallelism:
- Existing code collects all the listing to the Driver and then creates
HoodieRecord on the Driver.
- This takes a long time for large tables (11million HoodieRecords to be
created)
- Created a new function in SparkRDDWriteClient specifically for
bootstrap commit. In it, the HoodieRecord creation is parallelized across
executors so it completes fast.
3. Fixed setting to limit the number of parallel listings:
- Existing code had a bug wherein 1500 executors were hardcoded to
perform listing. This leads to exception due to limit in the spark's result
memory.
- Corrected the use of the config.
Result:
Dataset has 1299 partitions and 12Million files.
file listing time=1.5mins
HoodieRecord creation time=13seconds
deltacommit duration=2.6mins
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 199 ++++++++++++++-------
.../FlinkHoodieBackedTableMetadataWriter.java | 4 +-
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 5 +
.../SparkHoodieBackedTableMetadataWriter.java | 11 +-
.../org/apache/hudi/common/data/HoodieData.java | 7 +
.../org/apache/hudi/common/data/HoodieList.java | 8 +
.../java/org/apache/hudi/common/fs/FSUtils.java | 20 ++-
.../apache/hudi/common/model/HoodieFileFormat.java | 10 ++
8 files changed, 188 insertions(+), 76 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index bf7bd8f..71d6193 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -39,7 +40,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -51,7 +51,6 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -68,6 +67,8 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -175,7 +176,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
@@ -400,92 +401,68 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " +
dataWriteConfig.getBasePath());
- Map<String, List<FileStatus>> partitionToFileStatus =
getPartitionsToFilesMapping(dataMetaClient);
-
- // Create a HoodieCommitMetadata with writeStats for all discovered files
- int[] stats = {0};
- HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-
- partitionToFileStatus.forEach((partition, statuses) -> {
- // Filter the statuses to only include files which were created before
or on createInstantTime
- statuses.stream().filter(status -> {
- String filename = status.getPath().getName();
- return
!HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename),
HoodieTimeline.GREATER_THAN,
- createInstantTime);
- }).forEach(status -> {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setPath((partition.isEmpty() ? "" : partition +
Path.SEPARATOR) + status.getPath().getName());
- writeStat.setPartitionPath(partition);
- writeStat.setTotalWriteBytes(status.getLen());
- commitMetadata.addWriteStat(partition, writeStat);
- stats[0] += 1;
- });
-
- // If the partition has no files then create a writeStat with no file
path
- if (commitMetadata.getWriteStats(partition) == null) {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setPartitionPath(partition);
- commitMetadata.addWriteStat(partition, writeStat);
- }
- });
+ List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
- LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
- update(commitMetadata, createInstantTime, false);
+ // During bootstrap, the list of files to be committed can be huge. So
creating a HoodieCommitMetadata out of these
+ // large number of files and calling the existing
update(HoodieCommitMetadata) function does not scale well.
+ // Hence, we have a special commit just for the bootstrap scenario.
+ bootstrapCommit(dirInfoList, createInstantTime);
return true;
}
/**
* Function to find hoodie partitions and list files in them in parallel.
*
- * @param dataMetaClient
+ * @param datasetMetaClient data set meta client instance.
* @return Map of partition names to a list of FileStatus for all the files
in the partition
*/
- private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient
datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
- Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
- SerializableConfiguration conf = new
SerializableConfiguration(dataMetaClient.getHadoopConf());
+ SerializableConfiguration conf = new
SerializableConfiguration(datasetMetaClient.getHadoopConf());
final String dirFilterRegex =
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+ final String datasetBasePath = datasetMetaClient.getBasePath();
while (!pathsToList.isEmpty()) {
- int listingParallelism = Math.min(fileListingParallelism,
pathsToList.size());
+ // In each round we will list a section of directories
+ int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
- List<Pair<Path, FileStatus[]>> dirToFileListing =
engineContext.map(pathsToList, path -> {
+ List<DirectoryInfo> processedDirectories =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
- return Pair.of(path, fs.listStatus(path));
- }, listingParallelism);
- pathsToList.clear();
+ String relativeDirPath = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), path);
+ return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+ }, numDirsToList);
+
+ pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
// If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
// the results.
- dirToFileListing.forEach(p -> {
- if (!dirFilterRegex.isEmpty() &&
p.getLeft().getName().matches(dirFilterRegex)) {
- LOG.info("Ignoring directory " + p.getLeft() + " which matches the
filter regex " + dirFilterRegex);
- return;
+ for (DirectoryInfo dirInfo : processedDirectories) {
+ if (!dirFilterRegex.isEmpty()) {
+ final String relativePath = dirInfo.getRelativePath();
+ if (!relativePath.isEmpty()) {
+ Path partitionPath = new Path(datasetBasePath, relativePath);
+ if (partitionPath.getName().matches(dirFilterRegex)) {
+ LOG.info("Ignoring directory " + partitionPath + " which matches
the filter regex " + dirFilterRegex);
+ continue;
+ }
+ }
}
- List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
- .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
- .collect(Collectors.toList());
-
- if (p.getRight().length > filesInDir.size()) {
- String partitionName = FSUtils.getRelativePartitionPath(new
Path(dataMetaClient.getBasePath()), p.getLeft());
- // deal with Non-partition table, we should exclude .hoodie
- partitionToFileStatus.put(partitionName, filesInDir.stream()
- .filter(f ->
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+ if (dirInfo.isHoodiePartition()) {
+ // Add to result
+ partitionsToBootstrap.add(dirInfo);
} else {
// Add sub-dirs to the queue
- pathsToList.addAll(Arrays.stream(p.getRight())
- .filter(fs -> fs.isDirectory() &&
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
- .map(fs -> fs.getPath())
- .collect(Collectors.toList()));
+ pathsToList.addAll(dirInfo.getSubDirectories());
}
- });
+ }
}
- return partitionToFileStatus;
+ return partitionsToBootstrap;
}
/**
@@ -549,7 +526,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
if (enabled && metadata != null) {
List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
- commit(records, MetadataPartitionType.FILES.partitionPath(),
instantTime, canTriggerTableService);
+ commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime,
canTriggerTableService);
}
}
@@ -611,7 +588,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
List<HoodieRecord> records =
HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(),
rollbackMetadata, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
- commit(records, MetadataPartitionType.FILES.partitionPath(),
instantTime, false);
+ commit(engineContext.parallelize(records, 1),
MetadataPartitionType.FILES.partitionPath(), instantTime, false);
}
}
@@ -624,12 +601,12 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
- * @param records The list of records to be written.
+ * @param records The HoodieData of records to be written.
* @param partitionName The partition to which the records are to be written.
* @param instantTime The timestamp to use for the deltacommit.
* @param canTriggerTableService true if table services can be scheduled and
executed. false otherwise.
*/
- protected abstract void commit(List<HoodieRecord> records, String
partitionName, String instantTime, boolean canTriggerTableService);
+ protected abstract void commit(HoodieData<HoodieRecord> records, String
partitionName, String instantTime, boolean canTriggerTableService);
/**
* Perform a compaction on the Metadata Table.
@@ -668,4 +645,96 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
// metadata table.
writeClient.clean(instantTime + "002");
}
+
+ /**
+ * This is invoked to bootstrap metadata table for a dataset. Bootstrap
Commit has special handling mechanism due to its scale compared to
+ * other regular commits.
+ *
+ */
+ protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String
createInstantTime) {
+ List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath()).collect(Collectors.toList());
+ final int totalFiles = partitionInfoList.stream().mapToInt(p ->
p.getTotalFiles()).sum();
+
+ // Record which saves the list of all partitions
+ HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
+ if (partitions.isEmpty()) {
+ // in case of boostrapping of a fresh table, there won't be any
partitions, but we need to make a boostrap commit
+
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord),
1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
+ return;
+ }
+ HoodieData<HoodieRecord> partitionRecords =
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
+ if (!partitionInfoList.isEmpty()) {
+ HoodieData<HoodieRecord> fileListRecords =
engineContext.parallelize(partitionInfoList,
partitionInfoList.size()).map(partitionInfo -> {
+ // Record which saves files within a partition
+ return HoodieMetadataPayload.createPartitionFilesRecord(
+ partitionInfo.getRelativePath(),
Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
+ });
+ partitionRecords = partitionRecords.union(fileListRecords);
+ }
+
+ LOG.info("Committing " + partitions.size() + " partitions and " +
totalFiles + " files to metadata");
+ ValidationUtils.checkState(partitionRecords.count() == (partitions.size()
+ 1));
+ commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(),
createInstantTime, false);
+ }
+
+ /**
+ * A class which represents a directory and the files and directories inside
it.
+ *
+ * A {@code PartitionFileInfo} object saves the name of the partition and
various properties requires of each file
+ * required for bootstrapping the metadata table. Saving limited properties
reduces the total memory footprint when
+ * a very large number of files are present in the dataset being
bootstrapped.
+ */
+ static class DirectoryInfo implements Serializable {
+ // Relative path of the directory (relative to the base directory)
+ private final String relativePath;
+ // Map of filenames within this partition to their respective sizes
+ private HashMap<String, Long> filenameToSizeMap;
+ // List of directories within this partition
+ private final List<Path> subDirectories = new ArrayList<>();
+ // Is this a hoodie partition
+ private boolean isHoodiePartition = false;
+
+ public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
+ this.relativePath = relativePath;
+
+ // Pre-allocate with the maximum length possible
+ filenameToSizeMap = new HashMap<>(fileStatus.length);
+
+ for (FileStatus status : fileStatus) {
+ if (status.isDirectory()) {
+ // Ignore .hoodie directory as there cannot be any partitions inside
it
+ if
(!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ this.subDirectories.add(status.getPath());
+ }
+ } else if
(status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
{
+ // Presence of partition meta file implies this is a HUDI partition
+ this.isHoodiePartition = true;
+ } else if (FSUtils.isDataFile(status.getPath())) {
+ // Regular HUDI data file (base file or log file)
+ filenameToSizeMap.put(status.getPath().getName(), status.getLen());
+ }
+ }
+ }
+
+ String getRelativePath() {
+ return relativePath;
+ }
+
+ int getTotalFiles() {
+ return filenameToSizeMap.size();
+ }
+
+ boolean isHoodiePartition() {
+ return isHoodiePartition;
+ }
+
+ List<Path> getSubDirectories() {
+ return subDirectories;
+ }
+
+ // Returns a map of filenames mapped to their lengths
+ Map<String, Long> getFileNameToSizeMap() {
+ return filenameToSizeMap;
+ }
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 96cdcae..174fc1e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
@@ -91,8 +92,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
@Override
- protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime, boolean canTriggerTableService) {
+ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String
partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
+ List<HoodieRecord> records = (List<HoodieRecord>) hoodieDataRecords.get();
List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
try (HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index ceaee47..d4eb259 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -131,6 +131,11 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
}
@Override
+ public HoodieData<T> union(HoodieData<T> other) {
+ return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
+ }
+
+ @Override
public List<T> collectAsList() {
return rddData.collect();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index ad6f9d9..0673ca9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
@@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
@@ -121,9 +121,9 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
}
- @Override
- protected void commit(List<HoodieRecord> records, String partitionName,
String instantTime, boolean canTriggerTableService) {
+ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String
partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to
as it is not enabled");
+ JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>)
hoodieDataRecords.get();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
@@ -166,12 +166,11 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
*
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int numFileGroups) {
+ private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- return jsc.parallelize(records, 1).map(r -> {
+ return recordsRDD.map(r -> {
FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 7ea7e0d..093fd43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -98,6 +98,13 @@ public abstract class HoodieData<T> implements Serializable {
public abstract HoodieData<T> distinct();
/**
+ * Unions this {@link HoodieData} with other {@link HoodieData}.
+ * @param other {@link HoodieData} of interest.
+ * @return the union of two as as instance of {@link HoodieData}.
+ */
+ public abstract HoodieData<T> union(HoodieData<T> other);
+
+ /**
* @return collected results in {@link List<T>}.
*/
public abstract List<T> collectAsList();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
index 6c23fdf..9441619 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
@@ -133,6 +133,14 @@ public class HoodieList<T> extends HoodieData<T> {
}
@Override
+ public HoodieData<T> union(HoodieData<T> other) {
+ List<T> unionResult = new ArrayList<>();
+ unionResult.addAll(listData);
+ unionResult.addAll(other.collectAsList());
+ return HoodieList.of(unionResult);
+ }
+
+ @Override
public List<T> collectAsList() {
return listData;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 8273ca7..dc4df23 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -239,7 +239,7 @@ public class FSUtils {
/**
* Recursively processes all files in the base-path. If excludeMetaFolder is
set, the meta-folder and all its subdirs
* are skipped
- *
+ *
* @param fs File System
* @param basePathStr Base-Path
* @param consumer Callback for processing
@@ -431,18 +431,30 @@ public class FSUtils {
public static String makeLogFileName(String fileId, String logFileExtension,
String baseCommitTime, int version,
String writeToken) {
- String suffix =
- (writeToken == null) ? String.format("%s_%s%s.%d", fileId,
baseCommitTime, logFileExtension, version)
- : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime,
logFileExtension, version, writeToken);
+ String suffix = (writeToken == null)
+ ? String.format("%s_%s%s.%d", fileId, baseCommitTime,
logFileExtension, version)
+ : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime,
logFileExtension, version, writeToken);
return LOG_FILE_PREFIX + suffix;
}
+ public static boolean isBaseFile(Path path) {
+ String extension = getFileExtension(path.getName());
+ return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
+ }
+
public static boolean isLogFile(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
return matcher.find() && logPath.getName().contains(".log");
}
/**
+ * Returns true if the given path is a Base file or a Log file.
+ */
+ public static boolean isDataFile(Path path) {
+ return isBaseFile(path) || isLogFile(path);
+ }
+
+ /**
* Get the names of all the base and log files in the given partition path.
*/
public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path
partitionPath) throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index f7fdcd0..3263910 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -18,6 +18,11 @@
package org.apache.hudi.common.model;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Hoodie file format.
*/
@@ -27,6 +32,11 @@ public enum HoodieFileFormat {
HFILE(".hfile"),
ORC(".orc");
+ public static final Set<String> BASE_FILE_EXTENSIONS =
Arrays.stream(HoodieFileFormat.values())
+ .map(HoodieFileFormat::getFileExtension)
+ .filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension()))
+ .collect(Collectors.toCollection(HashSet::new));
+
private final String extension;
HoodieFileFormat(String extension) {