umehrot2 commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r654727713
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
##########
@@ -105,6 +107,24 @@ public FileSystemBackedTableMetadata(HoodieEngineContext
engineContext, Serializ
return partitionPaths;
}
+ @Override
+ public Map<String, FileStatus[]> getAllFilesInPartitions(List<String>
partitionPaths)
+ throws IOException {
+ if (partitionPaths == null || partitionPaths.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
partitionPaths.size());
+
+ List<Pair<String, FileStatus[]>> partitionToFiles =
engineContext.map(partitionPaths, partitionPathStr -> {
+ Path partitionPath = new Path(partitionPathStr);
+ FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
+ return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs,
partitionPath));
Review comment:
I see yeah FileStatus is not serializable in Hadoop 2, but has been made
Serializable in Hadoop 3. We should fix this in a separate PR for all methods
by introducing `SerializableFileStatus` similar to Spark
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L347.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -314,18 +379,15 @@ case class HoodieFileIndex(
case None => pathToFetch.append(partitionRowPath)
}
}
- // Fetch the rest from the file system.
- val fetchedPartition2Files =
- spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size,
maxListParallelism))
- .map { partitionRowPath =>
- // Here we use a LocalEngineContext to get the files in the
partition.
- // We can do this because the TableMetadata.getAllFilesInPartition
only rely on the
- // hadoopConf of the EngineContext.
- val engineContext = new
HoodieLocalEngineContext(serializableConf.get())
- val filesInPartition = FSUtils.getFilesInPartition(engineContext,
metadataConfig,
- basePath, partitionRowPath.fullPartitionPath(basePath))
- (partitionRowPath, filesInPartition)
- }.collect().map(f => f._1 -> f._2).toMap
+
+ var fetchedPartition2Files: Map[PartitionRowPath, Array[FileStatus]] =
Map()
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]