This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 45da30dc3ec [HUDI-5485] Add File System View API for batch listing and 
improve savepoint performance with metadata table (#7690)
45da30dc3ec is described below

commit 45da30dc3ecfd4e7315d8f33d95504a5ac7cbd1a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 25 23:39:46 2023 -0800

    [HUDI-5485] Add File System View API for batch listing and improve 
savepoint performance with metadata table (#7690)
    
    The savepoint action needs to list all the latest base files before or on 
the target instant time. When the metadata table is enabled, for each 
partition, the metadata table is scanned from the Spark executor, which leads 
to a lot of S3 requests. This significantly degrades the savepoint performance 
when the number of table partitions is huge. Such repeated scanning of the 
metadata table is unnecessary and can be avoided by scanning the metadata table 
once.
    
    This commit fixes the issue by adding a new file system view API that 
batches the file listing of all partitions so that only one metadata table 
lookup call (`tableMetadata.getAllFilesInPartitions`) is invoked on the driver 
side for savepoint action.
---
 .../action/savepoint/SavepointActionExecutor.java  |  55 +++++-
 .../java/org/apache/hudi/client/TestSavepoint.java | 152 +++++++++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |   2 +-
 .../table/view/AbstractTableFileSystemView.java    | 190 +++++++++++++++++----
 .../table/view/PriorityBasedFileSystemView.java    |   7 +
 .../view/RemoteHoodieTableFileSystemView.java      |  45 ++++-
 .../common/table/view/TableFileSystemView.java     |  11 ++
 .../metadata/HoodieMetadataFileSystemView.java     |  25 +++
 .../hudi/timeline/service/RequestHandler.java      |   9 +
 .../timeline/service/handlers/BaseFileHandler.java |  11 ++
 10 files changed, 458 insertions(+), 49 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 3bfdd20721e..4e0ae1da223 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -84,15 +85,33 @@ public class SavepointActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
           "Could not savepoint commit " + instantTime + " as this is beyond 
the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest 
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
-      List<String> partitions = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), table.getMetaClient().getBasePath());
-      Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, 
partitionPath -> {
-        // Scan all partitions files with this commit time
-        LOG.info("Collecting latest files in partition path " + partitionPath);
-        TableFileSystemView.BaseFileOnlyView view = 
table.getBaseFileOnlyView();
-        List<String> latestFiles = 
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-            .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
-        return new ImmutablePair<>(partitionPath, latestFiles);
-      }, null);
+      TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+      Map<String, List<String>> latestFilesMap;
+      // NOTE: for performance, we have to use different logic here for 
listing the latest files
+      // before or on the given instant:
+      // (1) using metadata-table-based file listing: instead of parallelizing 
the partition
+      // listing which incurs unnecessary metadata table reads, we directly 
read the metadata
+      // table once in a batch manner through the timeline server;
+      // (2) using direct file system listing:  we parallelize the partition 
listing so that
+      // each partition can be listed on the file system concurrently through 
Spark.
+      // Note that
+      if (shouldUseBatchLookup(config)) {
+        latestFilesMap = 
view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> 
entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+      } else {
+        List<String> partitions = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), table.getMetaClient().getBasePath());
+        latestFilesMap = context.mapToPair(partitions, partitionPath -> {
+          // Scan all partitions files with this commit time
+          LOG.info("Collecting latest files in partition path " + 
partitionPath);
+          List<String> latestFiles = 
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+              .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+          return new ImmutablePair<>(partitionPath, latestFiles);
+        }, null);
+      }
+
       HoodieSavepointMetadata metadata = 
TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
       // Nothing to save in the savepoint
       table.getActiveTimeline().createNewInstant(
@@ -106,4 +125,22 @@ public class SavepointActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
       throw new HoodieSavepointException("Failed to savepoint " + instantTime, 
e);
     }
   }
+
+  /**
+   * Whether to use batch lookup for listing the latest base files in metadata 
table.
+   * <p>
+   * Note that metadata table has to be enabled, and the storage type of the 
file system view
+   * cannot be EMBEDDED_KV_STORE or SPILLABLE_DISK (these two types are not 
integrated with
+   * metadata table, see HUDI-5612).
+   *
+   * @param config Write configs.
+   * @return {@code true} if using batch lookup; {@code false} otherwise.
+   */
+  private boolean shouldUseBatchLookup(HoodieWriteConfig config) {
+    FileSystemViewStorageType storageType =
+        config.getClientSpecifiedViewStorageConfig().getStorageType();
+    return config.getMetadataConfig().enabled()
+        && !FileSystemViewStorageType.EMBEDDED_KV_STORE.equals(storageType)
+        && !FileSystemViewStorageType.SPILLABLE_DISK.equals(storageType);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
new file mode 100644
index 00000000000..765c24d6a65
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.view.FileSystemViewStorageType.EMBEDDED_KV_STORE;
+import static 
org.apache.hudi.common.table.view.FileSystemViewStorageType.MEMORY;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for savepoint operation.
+ */
+public class TestSavepoint extends HoodieClientTestBase {
+
+  private static Stream<Arguments> testSavepointParams() {
+    return Arrays.stream(new Object[][] {
+        {true, MEMORY}, {true, EMBEDDED_KV_STORE},
+        {false, MEMORY}, {false, EMBEDDED_KV_STORE}
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testSavepointParams")
+  public void testSavepoint(boolean enableMetadataTable,
+                            FileSystemViewStorageType storageType) throws 
IOException {
+    HoodieWriteConfig cfg = getWriteConfig(enableMetadataTable, storageType);
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+      client.savepoint("user", "hoodie-savepoint-unit-test");
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
+      HoodieTimeline savepointTimeline = 
table.getActiveTimeline().getSavePointTimeline();
+      assertEquals(1, savepointTimeline.countInstants());
+
+      Map<String, HoodieSavepointPartitionMetadata> 
savepointPartitionMetadataMap =
+          TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+                  
savepointTimeline.getInstantDetails(savepointTimeline.firstInstant().get()).get())
+              .getPartitionMetadata();
+
+      HoodieTimeline commitsTimeline = 
table.getActiveTimeline().getCommitsTimeline();
+      Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
HoodieCommitMetadata.fromBytes(
+              
commitsTimeline.getInstantDetails(commitsTimeline.firstInstant().get()).get(),
+              HoodieCommitMetadata.class)
+          .getPartitionToWriteStats();
+
+      assertEquals(partitionToWriteStats.size(), 
savepointPartitionMetadataMap.size());
+      for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+        String partition = entry.getKey();
+        assertTrue(savepointPartitionMetadataMap.containsKey(partition));
+        assertEquals(
+            entry.getValue().stream().map(path -> 
getFileNameFromPath(path.getPath()))
+                .sorted().collect(Collectors.toList()),
+            savepointPartitionMetadataMap.get(partition).getSavepointDataFile()
+                .stream().sorted().collect(Collectors.toList())
+        );
+      }
+    }
+  }
+
+  private HoodieWriteConfig getWriteConfig(boolean enableMetadataTable,
+                                           FileSystemViewStorageType 
storageType) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2)
+        .withDeleteParallelism(2)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(
+            
ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 
1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024)
+            .orcMaxFileSize(1024 * 1024).build())
+        .forTable(RAW_TRIPS_TEST_NAME)
+        .withEmbeddedTimelineServerEnabled(true)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withEnableBackupForRemoteFileSystemView(false) // Fail test if 
problem connecting to timeline-server
+            .withRemoteServerPort(timelineServicePort)
+            .withStorageType(storageType)
+            .build())
+        .withMetadataConfig(
+            
HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        .build();
+  }
+
+  private String getFileNameFromPath(String path) {
+    String[] parts = path.split("/");
+    return parts[parts.length - 1];
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 94e15206c39..2b4172781c6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -21,8 +21,8 @@ package org.apache.hudi.testutils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index ca847bedaf0..8ea017977e9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -52,6 +53,7 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
@@ -122,7 +125,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
   /**
    * Refresh commits timeline.
-   * 
+   *
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -164,13 +167,13 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    * Build FileGroups from passed in file-status.
    */
   protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, 
HoodieTimeline timeline,
-      boolean addPendingCompactionFileSlice) {
+                                                  boolean 
addPendingCompactionFileSlice) {
     return buildFileGroups(convertFileStatusesToBaseFiles(statuses), 
convertFileStatusesToLogFiles(statuses), timeline,
         addPendingCompactionFileSlice);
   }
 
   protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> 
baseFileStream,
-      Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean 
addPendingCompactionFileSlice) {
+                                                  Stream<HoodieLogFile> 
logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
     Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
         baseFileStream.collect(Collectors.groupingBy(baseFile -> {
           String partitionPathStr = getPartitionPathFor(baseFile);
@@ -227,7 +230,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
         // get replace instant mapping for each partition, fileId
         return 
replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry
 -> entry.getValue().stream().map(e ->
-                new AbstractMap.SimpleEntry<>(new 
HoodieFileGroupId(entry.getKey(), e), instant)));
+            new AbstractMap.SimpleEntry<>(new 
HoodieFileGroupId(entry.getKey(), e), instant)));
       } catch (HoodieIOException ex) {
 
         if (ex.getIOException() instanceof FileNotFoundException) {
@@ -289,6 +292,114 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    */
   protected abstract void resetViewState();
 
+  /**
+   * Batch loading all the partitions if needed.
+   *
+   * @return A list of relative partition paths of all partitions.
+   */
+  private List<String> ensureAllPartitionsLoadedCorrectly() {
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+    try {
+      List<String> formattedPartitionList = getAllPartitionPaths().stream()
+          .map(this::formatPartitionKey).collect(Collectors.toList());
+      ensurePartitionsLoadedCorrectly(formattedPartitionList);
+      return formattedPartitionList;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get all partition paths", e);
+    }
+  }
+
+  /**
+   * Allows lazily loading the partitions if needed.
+   *
+   * @param partitionList list of partitions to be loaded if not present.
+   */
+  private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+    ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+    Set<String> partitionSet = new HashSet<>();
+    synchronized (addedPartitions) {
+      partitionList.forEach(partition -> {
+        if (!addedPartitions.containsKey(partition) && 
!isPartitionAvailableInStore(partition)) {
+          partitionSet.add(partition);
+        }
+      });
+
+      if (!partitionSet.isEmpty()) {
+        long beginTs = System.currentTimeMillis();
+        // Not loaded yet
+        try {
+          LOG.info("Building file system view for partitions " + partitionSet);
+
+          // Pairs of relative partition path and absolute partition path
+          List<Pair<String, Path>> absolutePartitionPathList = 
partitionSet.stream()
+              .map(partition -> Pair.of(
+                  partition, 
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+              .collect(Collectors.toList());
+          long beginLsTs = System.currentTimeMillis();
+          Map<Pair<String, Path>, FileStatus[]> statusesMap =
+              listPartitions(absolutePartitionPathList);
+          long endLsTs = System.currentTimeMillis();
+          LOG.debug("Time taken to list partitions " + partitionSet + " =" + 
(endLsTs - beginLsTs));
+          statusesMap.forEach((partitionPair, statuses) -> {
+            String relativePartitionStr = partitionPair.getLeft();
+            List<HoodieFileGroup> groups = addFilesToView(statuses);
+            if (groups.isEmpty()) {
+              storePartitionView(relativePartitionStr, new ArrayList<>());
+            }
+            LOG.debug("#files found in partition (" + relativePartitionStr + 
") =" + statuses.length);
+          });
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to list base files in partitions 
" + partitionSet, e);
+        }
+        long endTs = System.currentTimeMillis();
+        LOG.debug("Time to load partition " + partitionSet + " =" + (endTs - 
beginTs));
+      }
+
+      partitionSet.forEach(partition ->
+          addedPartitions.computeIfAbsent(partition, partitionPathStr -> true)
+      );
+    }
+  }
+
+  /***
+   * @return A list of relative partition paths of all partitions.
+   * @throws IOException upon error.
+   */
+  protected List<String> getAllPartitionPaths() throws IOException {
+    throw new HoodieException("Getting all partition paths with file system 
listing sequentially "
+        + "can be very slow. This should not be invoked.");
+  }
+
+  /**
+   * @param partitionPathList A list of pairs of the relative and absolute 
paths of the partitions.
+   * @return all the files from the partitions.
+   * @throws IOException upon error.
+   */
+  protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
+      List<Pair<String, Path>> partitionPathList) throws IOException {
+    Map<Pair<String, Path>, FileStatus[]> fileStatusMap = new HashMap<>();
+
+    for (Pair<String, Path> partitionPair : partitionPathList) {
+      Path absolutePartitionPath = partitionPair.getRight();
+      try {
+        fileStatusMap.put(partitionPair, 
metaClient.getFs().listStatus(absolutePartitionPath));
+      } catch (IOException e) {
+        // Create the path if it does not exist already
+        if (!metaClient.getFs().exists(absolutePartitionPath)) {
+          metaClient.getFs().mkdirs(absolutePartitionPath);
+          fileStatusMap.put(partitionPair, new FileStatus[0]);
+        } else {
+          // in case the partition path was created by another caller
+          fileStatusMap.put(partitionPair, 
metaClient.getFs().listStatus(absolutePartitionPath));
+        }
+      }
+    }
+
+    return fileStatusMap;
+  }
+
   /**
    * Allows lazily loading the partitions if needed.
    *
@@ -537,19 +648,38 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
-      return fetchAllStoredFileGroups(partitionPath)
-          .filter(fileGroup -> 
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
-          .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
-              .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
-              ))
-              .filter(df -> !isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst()))
-          .filter(Option::isPresent).map(Option::get)
-          .map(df -> addBootstrapBaseFileIfPresent(new 
HoodieFileGroupId(partitionPath, df.getFileId()), df));
+      return getLatestBaseFilesBeforeOrOnFromCache(partitionPath, 
maxCommitTime);
     } finally {
       readLock.unlock();
     }
   }
 
+  @Override
+  public final Map<String, Stream<HoodieBaseFile>> 
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+
+      List<String> formattedPartitionList = 
ensureAllPartitionsLoadedCorrectly();
+      return formattedPartitionList.stream().collect(Collectors.toMap(
+          Function.identity(),
+          partitionPath -> 
getLatestBaseFilesBeforeOrOnFromCache(partitionPath, maxCommitTime)
+      ));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOnFromCache(String 
partitionPath, String maxCommitTime) {
+    return fetchAllStoredFileGroups(partitionPath)
+        .filter(fileGroup -> 
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
+        .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
+            .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
+            ))
+            .filter(df -> !isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst()))
+        .filter(Option::isPresent).map(Option::get)
+        .map(df -> addBootstrapBaseFileIfPresent(new 
HoodieFileGroupId(partitionPath, df.getFileId()), df));
+  }
+
   @Override
   public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, 
String instantTime, String fileId) {
     try {
@@ -560,8 +690,8 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
         return Option.empty();
       } else {
         return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> 
fileGroup.getAllBaseFiles()
-            .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.EQUALS,
-                instantTime)).filter(df -> 
!isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
+                .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.EQUALS,
+                    instantTime)).filter(df -> 
!isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
             .map(df -> addBootstrapBaseFileIfPresent(new 
HoodieFileGroupId(partitionPath, fileId), df));
       }
     } finally {
@@ -596,8 +726,8 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       return fetchAllStoredFileGroups()
           .filter(fileGroup -> 
!isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
           .map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), 
Option.fromJavaOptional(
-          fileGroup.getAllBaseFiles().filter(baseFile -> 
commitsToReturn.contains(baseFile.getCommitTime())
-              && !isBaseFileDueToPendingCompaction(baseFile) && 
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> 
p.getValue().isPresent())
+              fileGroup.getAllBaseFiles().filter(baseFile -> 
commitsToReturn.contains(baseFile.getCommitTime())
+                  && !isBaseFileDueToPendingCompaction(baseFile) && 
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> 
p.getValue().isPresent())
           .map(p -> addBootstrapBaseFileIfPresent(p.getKey(), 
p.getValue().get()));
     } finally {
       readLock.unlock();
@@ -627,9 +757,9 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       return fetchLatestFileSlices(partitionPath)
-              .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
-              .flatMap(slice -> 
this.filterBaseFileAfterPendingCompaction(slice, true))
-              .map(this::addBootstrapBaseFileIfPresent);
+          .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
+          .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, 
true))
+          .map(this::addBootstrapBaseFileIfPresent);
     } finally {
       readLock.unlock();
     }
@@ -684,26 +814,26 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
   @Override
   public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionStr, String maxCommitTime,
-      boolean includeFileSlicesInPendingCompaction) {
+                                                               boolean 
includeFileSlicesInPendingCompaction) {
     try {
       readLock.lock();
       String partitionPath = formatPartitionKey(partitionStr);
       ensurePartitionLoadedCorrectly(partitionPath);
       Stream<Stream<FileSlice>> allFileSliceStream = 
fetchAllStoredFileGroups(partitionPath)
-              .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
-              .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
+          .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+          .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
       if (includeFileSlicesInPendingCompaction) {
         return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice 
-> this.filterBaseFileAfterPendingCompaction(slice, false)))
-                .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
-                .map(this::addBootstrapBaseFileIfPresent);
+            .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+            .map(this::addBootstrapBaseFileIfPresent);
       } else {
         return allFileSliceStream
-                .map(sliceStream ->
-                        Option.fromJavaOptional(sliceStream
-                                .filter(slice -> 
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
-                                .filter(slice -> !slice.isEmpty())
-                                .findFirst()))
-                
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+            .map(sliceStream ->
+                Option.fromJavaOptional(sliceStream
+                    .filter(slice -> 
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
+                    .filter(slice -> !slice.isEmpty())
+                    .findFirst()))
+            
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
       }
     } finally {
       readLock.unlock();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 62edc4daa33..44332117367 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 /**
@@ -145,6 +146,12 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
         secondaryView::getLatestBaseFilesBeforeOrOn);
   }
 
+  @Override
+  public Map<String, Stream<HoodieBaseFile>> 
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    return execute(maxCommitTime, 
preferredView::getAllLatestBaseFilesBeforeOrOn,
+        secondaryView::getAllLatestBaseFilesBeforeOrOn);
+  }
+
   @Override
   public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String 
fileId) {
     return execute(partitionPath, fileId, preferredView::getLatestBaseFile, 
secondaryView::getLatestBaseFile);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 5ada3dc45b5..bffd6353aa7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -54,6 +54,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -88,6 +89,8 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
       String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
   public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
       String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
+  public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
+      String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
 
   public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
       String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
@@ -155,17 +158,17 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     this.serverPort = viewConf.getRemoteViewServerPort();
     this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
     if (viewConf.isRemoteTimelineClientRetryEnabled()) {
-      retryHelper =  new RetryHelper(
-              viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
-              viewConf.getRemoteTimelineClientMaxRetryNumbers(),
-              viewConf.getRemoteTimelineInitialRetryIntervalMs(),
-              viewConf.getRemoteTimelineClientRetryExceptions(),
-              "Sending request");
+      retryHelper = new RetryHelper(
+          viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+          viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+          viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+          viewConf.getRemoteTimelineClientRetryExceptions(),
+          "Sending request");
     }
   }
 
   private <T> T executeRequest(String requestPath, Map<String, String> 
queryParameters, TypeReference reference,
-      RequestMethod method) throws IOException {
+                               RequestMethod method) throws IOException {
     ValidationUtils.checkArgument(!closed, "View already closed");
 
     URIBuilder builder =
@@ -252,13 +255,36 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     return getLatestBaseFilesFromParams(paramsMap, 
LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
   }
 
+  @Override
+  public Map<String, Stream<HoodieBaseFile>> 
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
+
+    try {
+      Map<String, List<BaseFileDTO>> dataFileMap = executeRequest(
+          ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
+          paramsMap,
+          new TypeReference<Map<String, List<BaseFileDTO>>>() {
+          },
+          RequestMethod.GET);
+      return dataFileMap.entrySet().stream().collect(
+          Collectors.toMap(
+              Map.Entry::getKey,
+              entry -> 
entry.getValue().stream().map(BaseFileDTO::toHoodieBaseFile)));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String 
instantTime, String fileId) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParams(partitionPath,
         new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime, 
fileId});
     try {
       List<BaseFileDTO> dataFiles = 
executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap,
-          new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+          new TypeReference<List<BaseFileDTO>>() {
+          }, RequestMethod.GET);
       return 
Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
@@ -505,7 +531,8 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
     try {
       List<BaseFileDTO> dataFiles = 
executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap,
-          new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+          new TypeReference<List<BaseFileDTO>>() {
+          }, RequestMethod.GET);
       return 
Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 18c9a9af998..97b972a75d1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 /**
@@ -64,6 +65,16 @@ public interface TableFileSystemView {
      */
     Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitionPath, 
String maxCommitTime);
 
+    /**
+     * Streams the latest version base files in all partitions with 
precondition that
+     * commitTime(file) before maxCommitTime.
+     *
+     * @param maxCommitTime The max commit time to consider.
+     * @return A {@link Map} of partition path to the latest version base 
files before or on the
+     * commit time
+     */
+    Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String 
maxCommitTime);
+
     /**
      * Stream all the latest data files pass.
      */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index ab5b5f6b4db..76b341609de 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -24,12 +24,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * {@code HoodieTableFileSystemView} implementation that retrieved partition 
listings from the Metadata Table.
@@ -65,6 +70,26 @@ public class HoodieMetadataFileSystemView extends 
HoodieTableFileSystemView {
     return tableMetadata.getAllFilesInPartition(partitionPath);
   }
 
+  @Override
+  protected List<String> getAllPartitionPaths() throws IOException {
+    return tableMetadata.getAllPartitionPaths();
+  }
+  
+  @Override
+  protected Map<Pair<String, Path>, FileStatus[]> 
listPartitions(List<Pair<String, Path>> partitionPathList) throws IOException {
+    Map<String, Pair<String, Path>> absoluteToPairMap = 
partitionPathList.stream()
+        .collect(Collectors.toMap(
+            pair -> pair.getRight().toString(),
+            Function.identity()
+        ));
+    return tableMetadata.getAllFilesInPartitions(
+            partitionPathList.stream().map(pair -> 
pair.getRight().toString()).collect(Collectors.toList()))
+        .entrySet().stream().collect(Collectors.toMap(
+            entry -> absoluteToPairMap.get(entry.getKey()),
+            Map.Entry::getValue
+        ));
+  }
+
   @Override
   public void reset() {
     super.reset();
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 4c999b9ad22..7ad175c692a 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -56,6 +56,7 @@ import org.jetbrains.annotations.NotNull;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -276,6 +277,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1);
+      Map<String, List<BaseFileDTO>> dtos = 
dataFileHandler.getAllLatestDataFilesBeforeOrOn(
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
index 5033a86f8f2..a34b49843fa 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -61,6 +62,16 @@ public class BaseFileHandler extends Handler {
         .map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList());
   }
 
+  public Map<String, List<BaseFileDTO>> getAllLatestDataFilesBeforeOrOn(String 
basePath, String maxInstantTime) {
+    return viewManager.getFileSystemView(basePath)
+        .getAllLatestBaseFilesBeforeOrOn(maxInstantTime)
+        .entrySet().stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> 
entry.getValue().map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList())
+        ));
+  }
+
   public List<BaseFileDTO> getLatestDataFileOn(String basePath, String 
partitionPath, String instantTime,
                                                String fileId) {
     List<BaseFileDTO> result = new ArrayList<>();


Reply via email to