umehrot2 commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r654727713



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
##########
@@ -105,6 +107,24 @@ public FileSystemBackedTableMetadata(HoodieEngineContext 
engineContext, Serializ
     return partitionPaths;
   }
 
+  @Override
+  public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> 
partitionPaths)
+      throws IOException {
+    if (partitionPaths == null || partitionPaths.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
partitionPaths.size());
+
+    List<Pair<String, FileStatus[]>> partitionToFiles = 
engineContext.map(partitionPaths, partitionPathStr -> {
+      Path partitionPath = new Path(partitionPathStr);
+      FileSystem fs = partitionPath.getFileSystem(hadoopConf.get());
+      return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, 
partitionPath));

Review comment:
       I see yeah FileStatus is not serializable in Hadoop 2, but has been made 
Serializable in Hadoop 3. We should fix this in a separate PR for all methods 
by introducing `SerializableFileStatus` similar to Spark 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L347.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -314,18 +379,15 @@ case class HoodieFileIndex(
         case None => pathToFetch.append(partitionRowPath)
       }
     }
-    // Fetch the rest from the file system.
-    val fetchedPartition2Files =
-      spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, 
maxListParallelism))
-        .map { partitionRowPath =>
-          // Here we use a LocalEngineContext to get the files in the 
partition.
-          // We can do this because the TableMetadata.getAllFilesInPartition 
only rely on the
-          // hadoopConf of the EngineContext.
-          val engineContext = new 
HoodieLocalEngineContext(serializableConf.get())
-          val filesInPartition =  FSUtils.getFilesInPartition(engineContext, 
metadataConfig,
-              basePath, partitionRowPath.fullPartitionPath(basePath))
-          (partitionRowPath, filesInPartition)
-        }.collect().map(f => f._1 -> f._2).toMap
+
+    var fetchedPartition2Files: Map[PartitionRowPath, Array[FileStatus]] = 
Map()

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to