vinothchandar commented on a change in pull request #2366:
URL: https://github.com/apache/hudi/pull/2366#discussion_r547567061



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -391,27 +397,48 @@ public static FileStatus getFileStatus(HoodieBaseFile 
baseFile) throws IOExcepti
     return grouped;
   }
 
+  public static Map<HoodieTableMetaClient, List<Path>> 
groupSnapshotPathsByMetaClient(
+          Collection<HoodieTableMetaClient> metaClientList,
+          List<Path> snapshotPaths
+  ) {
+    Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
+    metaClientList.forEach(metaClient -> grouped.put(metaClient, new 
ArrayList<>()));
+    for (Path path : snapshotPaths) {
+      // Find meta client associated with the input path
+      metaClientList.stream().filter(metaClient -> 
path.toString().contains(metaClient.getBasePath()))
+              .forEach(metaClient -> grouped.get(metaClient).add(path));
+    }
+    return grouped;
+  }
+
   /**
-   * Filters data files for a snapshot queried table.
+   * Filters data files under @param paths for a snapshot queried table.
    * @param job
-   * @param metadata
-   * @param fileStatuses
+   * @param metaClient
+   * @param paths
    * @return
    */
   public static List<FileStatus> filterFileStatusForSnapshotMode(
-      JobConf job, HoodieTableMetaClient metadata, List<FileStatus> 
fileStatuses) throws IOException {
-    FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+          JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) 
throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + 
metadata);
+      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + 
metaClient);
     }
-    // Get all commits, delta commits, compactions, as all of them produce a 
base parquet file today
-    HoodieTimeline timeline = 
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    TableFileSystemView.BaseFileOnlyView roView = new 
HoodieTableFileSystemView(metadata, timeline, statuses);
-    // filter files on the latest commit found
-    List<HoodieBaseFile> filteredFiles = 
roView.getLatestBaseFiles().collect(Collectors.toList());
-    LOG.info("Total paths to process after hoodie filter " + 
filteredFiles.size());
+
+    boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, 
DEFAULT_METADATA_ENABLE_FOR_READERS);
+    boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, 
DEFAULT_METADATA_VALIDATE);
+    HoodieTableFileSystemView fsView = 
FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+            useFileListingFromMetadata, verifyFileListing);
+
+    List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
+    for (Path p : paths) {
+      String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(metaClient.getBasePath()), p);
+      List<HoodieBaseFile> matched = 
fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());

Review comment:
       note to self: doing this by path is ok, since the FileSystemView 
internally caches per partition. 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -391,27 +397,48 @@ public static FileStatus getFileStatus(HoodieBaseFile 
baseFile) throws IOExcepti
     return grouped;
   }
 
+  public static Map<HoodieTableMetaClient, List<Path>> 
groupSnapshotPathsByMetaClient(
+          Collection<HoodieTableMetaClient> metaClientList,
+          List<Path> snapshotPaths
+  ) {
+    Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
+    metaClientList.forEach(metaClient -> grouped.put(metaClient, new 
ArrayList<>()));
+    for (Path path : snapshotPaths) {
+      // Find meta client associated with the input path
+      metaClientList.stream().filter(metaClient -> 
path.toString().contains(metaClient.getBasePath()))
+              .forEach(metaClient -> grouped.get(metaClient).add(path));
+    }
+    return grouped;
+  }
+
   /**
-   * Filters data files for a snapshot queried table.
+   * Filters data files under @param paths for a snapshot queried table.
    * @param job
-   * @param metadata
-   * @param fileStatuses
+   * @param metaClient
+   * @param paths
    * @return
    */
   public static List<FileStatus> filterFileStatusForSnapshotMode(
-      JobConf job, HoodieTableMetaClient metadata, List<FileStatus> 
fileStatuses) throws IOException {
-    FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+          JobConf job, HoodieTableMetaClient metaClient, List<Path> paths) 
throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + 
metadata);
+      LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + 
metaClient);
     }
-    // Get all commits, delta commits, compactions, as all of them produce a 
base parquet file today
-    HoodieTimeline timeline = 
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    TableFileSystemView.BaseFileOnlyView roView = new 
HoodieTableFileSystemView(metadata, timeline, statuses);
-    // filter files on the latest commit found
-    List<HoodieBaseFile> filteredFiles = 
roView.getLatestBaseFiles().collect(Collectors.toList());
-    LOG.info("Total paths to process after hoodie filter " + 
filteredFiles.size());
+
+    boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP, 
DEFAULT_METADATA_ENABLE_FOR_READERS);
+    boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP, 
DEFAULT_METADATA_VALIDATE);
+    HoodieTableFileSystemView fsView = 
FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+            useFileListingFromMetadata, verifyFileListing);
+
+    List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
+    for (Path p : paths) {
+      String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(metaClient.getBasePath()), p);
+      List<HoodieBaseFile> matched = 
fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
+      filteredBaseFiles.addAll(matched);
+    }
+
+    LOG.info("Total paths to process after hoodie filter " + 
filteredBaseFiles.size());
     List<FileStatus> returns = new ArrayList<>();
-    for (HoodieBaseFile filteredFile : filteredFiles) {
+    for (HoodieBaseFile filteredFile : filteredBaseFiles) {

Review comment:
       hmmm. slightly orthogonal, but the `HoodieBaseFile` itself should hand 
us a `FileStatus` object right? we should probably rethink the need for 
refreshing file status. 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -63,13 +69,25 @@
     // TODO(vc): Should we handle also non-hoodie splits here?
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = 
getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
 
+    boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP, 
DEFAULT_METADATA_ENABLE_FOR_READERS);
+    boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP, 
DEFAULT_METADATA_VALIDATE);
+    // Create file system cache so metadata table is only instantiated once. 
Also can benefit normal file listing if
+    // partition path is listed twice so file groups will already be loaded in 
file system
+    Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new 
HashMap<>();

Review comment:
       makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to