nsivabalan commented on a change in pull request #3873:
URL: https://github.com/apache/hudi/pull/3873#discussion_r740454888
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,4 +612,83 @@ protected void doClean(AbstractHoodieWriteClient
writeClient, String instantTime
// metadata table.
writeClient.clean(instantTime + "002");
}
+
+ /**
+ * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
+ *
+ */
+ protected abstract void commit(List<HoodieRecord> records, String
partitionName, String instantTime);
+
+ /**
+ * Commit the partition to file listing information to Metadata Table as a
new delta-commit.
+ *
+ */
+ protected abstract void commit(List<DirectoryInfo> dirInfoList, String
createInstantTime);
+
+
+ /**
+ * A class which represents a directory and the files and directories inside
it.
+ *
+ * A {@code PartitionFileInfo} object saves the name of the partition and
various properties requires of each file
+ * required for bootstrapping the metadata table. Saving limited properties
reduces the total memory footprint when
+ * a very large number of files are present in the dataset being
bootstrapped.
+ */
+ public static class DirectoryInfo implements Serializable {
+ // Relative path of the directory (relative to the base directory)
+ private String relativePath;
+ // List of filenames within this partition
+ private List<String> filenames;
+ // Length of the various files
+ private List<Long> filelengths;
+ // List of directories within this partition
+ private List<Path> subdirs = new ArrayList<>();
+ // Is this a HUDI partition
+ private boolean isPartition = false;
+
+ public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
+ this.relativePath = relativePath;
+
+ // Pre-allocate with the maximum length possible
+ filenames = new ArrayList<>(fileStatus.length);
+ filelengths = new ArrayList<>(fileStatus.length);
+
+ for (FileStatus status : fileStatus) {
+ if (status.isDirectory()) {
+ this.subdirs.add(status.getPath());
+ } else if
(status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
{
+ // Presence of partition meta file implies this is a HUDI partition
+ this.isPartition = true;
+ } else if (FSUtils.isDataFile(status.getPath())) {
+ // Regular HUDI data file (base file or log file)
+ filenames.add(status.getPath().getName());
+ filelengths.add(status.getLen());
+ }
+ }
+ }
+
+ public String getRelativePath() {
+ return relativePath;
+ }
+
+ public int getTotalFiles() {
+ return filenames.size();
+ }
+
+ public boolean isPartition() {
+ return isPartition;
+ }
+
+ public List<Path> getSubdirs() {
+ return subdirs;
+ }
+
+ // Returns a map of filenames mapped to their lengths
+ public Map<String, Long> getFileMap() {
Review comment:
if the caller is always going to be interested in, map of file name to
length, can we populate the map directly in the constructor of DirectoryInfo()
and not have two separate lists only.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
* @param dataMetaClient
* @return Map of partition names to a list of FileStatus for all the files
in the partition
*/
- private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient
datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
- Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new
SerializableConfiguration(dataMetaClient.getHadoopConf());
final String dirFilterRegex =
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+ final String datasetBasePath = dataMetaClient.getBasePath();
while (!pathsToList.isEmpty()) {
- int listingParallelism = Math.min(fileListingParallelism,
pathsToList.size());
+ // In each round we will list a section of directories
+ int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
- List<Pair<Path, FileStatus[]>> dirToFileListing =
engineContext.map(pathsToList, path -> {
+ List<DirectoryInfo> foundDirsList =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
- return Pair.of(path, fs.listStatus(path));
- }, listingParallelism);
- pathsToList.clear();
+ String relativeDirPath = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), path);
+ return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+ }, numDirsToList);
+
+ pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
// If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
// the results.
- dirToFileListing.forEach(p -> {
- if (!dirFilterRegex.isEmpty() &&
p.getLeft().getName().matches(dirFilterRegex)) {
- LOG.info("Ignoring directory " + p.getLeft() + " which matches the
filter regex " + dirFilterRegex);
- return;
+ for (DirectoryInfo dirInfo : foundDirsList) {
+ if (!dirFilterRegex.isEmpty()) {
+ final String relativePath = dirInfo.getRelativePath();
+ if (!relativePath.isEmpty()) {
+ Path partitionPath = new Path(datasetBasePath, relativePath);
+ if (partitionPath.getName().matches(dirFilterRegex)) {
+ LOG.info("Ignoring directory " + partitionPath + " which matches
the filter regex " + dirFilterRegex);
+ continue;
+ }
+ }
}
- List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
- .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
- .collect(Collectors.toList());
-
- if (p.getRight().length > filesInDir.size()) {
- String partitionName = FSUtils.getRelativePartitionPath(new
Path(dataMetaClient.getBasePath()), p.getLeft());
- // deal with Non-partition table, we should exclude .hoodie
- partitionToFileStatus.put(partitionName, filesInDir.stream()
- .filter(f ->
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+ if (dirInfo.isPartition()) {
+ // Add to result
+ foundPartitionsList.add(dirInfo);
} else {
// Add sub-dirs to the queue
- pathsToList.addAll(Arrays.stream(p.getRight())
- .filter(fs -> fs.isDirectory() &&
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
- .map(fs -> fs.getPath())
- .collect(Collectors.toList()));
+ pathsToList.addAll(dirInfo.getSubdirs());
Review comment:
Also, I see this in master before this patch
```
if (p.getRight().length > filesInDir.size()) {
String partitionName = FSUtils.getRelativePartitionPath(new
Path(dataMetaClient.getBasePath()), p.getLeft());
// deal with Non-partition table, we should exclude .hoodie
partitionToFileStatus.put(partitionName, filesInDir.stream()
.filter(f ->
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
}
```
may I know how we are handling the same in this patch?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
* @param dataMetaClient
* @return Map of partition names to a list of FileStatus for all the files
in the partition
*/
- private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient
datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
- Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new
SerializableConfiguration(dataMetaClient.getHadoopConf());
final String dirFilterRegex =
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+ final String datasetBasePath = dataMetaClient.getBasePath();
while (!pathsToList.isEmpty()) {
- int listingParallelism = Math.min(fileListingParallelism,
pathsToList.size());
+ // In each round we will list a section of directories
+ int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
- List<Pair<Path, FileStatus[]>> dirToFileListing =
engineContext.map(pathsToList, path -> {
+ List<DirectoryInfo> foundDirsList =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
Review comment:
minor: can we name this `processedDirectories`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
* @param dataMetaClient
* @return Map of partition names to a list of FileStatus for all the files
in the partition
*/
- private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient
datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
- Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
final int fileListingParallelism =
metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new
SerializableConfiguration(dataMetaClient.getHadoopConf());
final String dirFilterRegex =
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+ final String datasetBasePath = dataMetaClient.getBasePath();
while (!pathsToList.isEmpty()) {
- int listingParallelism = Math.min(fileListingParallelism,
pathsToList.size());
+ // In each round we will list a section of directories
+ int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
- List<Pair<Path, FileStatus[]>> dirToFileListing =
engineContext.map(pathsToList, path -> {
+ List<DirectoryInfo> foundDirsList =
engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
- return Pair.of(path, fs.listStatus(path));
- }, listingParallelism);
- pathsToList.clear();
+ String relativeDirPath = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), path);
+ return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+ }, numDirsToList);
+
+ pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList,
pathsToList.size()));
// If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
// the results.
- dirToFileListing.forEach(p -> {
- if (!dirFilterRegex.isEmpty() &&
p.getLeft().getName().matches(dirFilterRegex)) {
- LOG.info("Ignoring directory " + p.getLeft() + " which matches the
filter regex " + dirFilterRegex);
- return;
+ for (DirectoryInfo dirInfo : foundDirsList) {
+ if (!dirFilterRegex.isEmpty()) {
+ final String relativePath = dirInfo.getRelativePath();
+ if (!relativePath.isEmpty()) {
+ Path partitionPath = new Path(datasetBasePath, relativePath);
+ if (partitionPath.getName().matches(dirFilterRegex)) {
+ LOG.info("Ignoring directory " + partitionPath + " which matches
the filter regex " + dirFilterRegex);
+ continue;
+ }
+ }
}
- List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
- .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
- .collect(Collectors.toList());
-
- if (p.getRight().length > filesInDir.size()) {
- String partitionName = FSUtils.getRelativePartitionPath(new
Path(dataMetaClient.getBasePath()), p.getLeft());
- // deal with Non-partition table, we should exclude .hoodie
- partitionToFileStatus.put(partitionName, filesInDir.stream()
- .filter(f ->
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+ if (dirInfo.isPartition()) {
+ // Add to result
+ foundPartitionsList.add(dirInfo);
} else {
// Add sub-dirs to the queue
- pathsToList.addAll(Arrays.stream(p.getRight())
- .filter(fs -> fs.isDirectory() &&
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
- .map(fs -> fs.getPath())
- .collect(Collectors.toList()));
+ pathsToList.addAll(dirInfo.getSubdirs());
Review comment:
may I know where are we ignoring the meta paths? eg: L460 before this
patch.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
* @param dataMetaClient
* @return Map of partition names to a list of FileStatus for all the files
in the partition
*/
- private Map<String, List<FileStatus>>
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient
datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
- Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+ List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
Review comment:
may be we can name it "partitionsToBootstrap"
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
##########
@@ -36,4 +49,9 @@
public String getFileExtension() {
return extension;
}
+
+ public static boolean isBaseFile(Path path) {
Review comment:
do you think we should move this to FSUtils?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String
partitionName, String i
*
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int numFileGroups) {
+ private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- return jsc.parallelize(records, 1).map(r -> {
+ return recordsRDD.map(r -> {
FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
});
}
+
+ @Override
+ protected void commit(List<DirectoryInfo> partitionInfoList, String
createInstantTime) {
Review comment:
Recently we added some abstractions to hudi (HoodieData, HoodieJavaRdd,
HoodieList). Can we re-use them to avoid duplications across flink and spark.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String
partitionName, String i
*
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int numFileGroups) {
+ private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
- JavaSparkContext jsc = ((HoodieSparkEngineContext)
engineContext).getJavaSparkContext();
- return jsc.parallelize(records, 1).map(r -> {
+ return recordsRDD.map(r -> {
FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
});
}
+
+ @Override
+ protected void commit(List<DirectoryInfo> partitionInfoList, String
createInstantTime) {
Review comment:
We can have only one commit() method which deals with HoodieData and
abstract out the details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]