This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 28f67ff3561 [HUDI-7551] Avoid loading all partitions in CleanPlanner 
when MDT is enabled (#10928)
28f67ff3561 is described below

commit 28f67ff3561e04d9ee26d21930e5f928813e623a
Author: Tim Brown <[email protected]>
AuthorDate: Wed Mar 27 19:30:25 2024 -0500

    [HUDI-7551] Avoid loading all partitions in CleanPlanner when MDT is 
enabled (#10928)
---
 .../action/clean/CleanPlanActionExecutor.java      |  6 +++
 .../hudi/table/action/clean/CleanPlanner.java      | 13 ++---
 .../table/view/AbstractTableFileSystemView.java    | 13 ++++-
 .../table/view/PriorityBasedFileSystemView.java    | 25 +++++++++-
 .../view/RemoteHoodieTableFileSystemView.java      | 16 ++++++-
 .../common/table/view/TableFileSystemView.java     |  8 +++-
 .../table/view/TestHoodieTableFileSystemView.java  | 55 ++++++++++++++++++++++
 .../view/TestPriorityBasedFileSystemView.java      | 24 ++++++++++
 .../hudi/timeline/service/RequestHandler.java      | 16 +++++++
 .../service/handlers/FileSliceHandler.java         |  5 ++
 10 files changed, 164 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 723a95bb218..77c96b47f05 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
 import static org.apache.hudi.common.util.MapUtils.nonEmpty;
 import static 
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
 
@@ -122,10 +123,15 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
 
       Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
       List<String> partitionsToDelete = new ArrayList<>();
+      boolean shouldUseBatchLookup = 
shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config);
       for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
         // Handles at most 'cleanerParallelism' number of partitions once at a 
time to avoid overlarge memory pressure to the timeline server
         // (remote or local embedded), thus to reduce the risk of an OOM 
exception.
         List<String> subPartitionsToClean = partitionsToClean.subList(i, 
Math.min(i + cleanerParallelism, partitionsToClean.size()));
+        if (shouldUseBatchLookup) {
+          LOG.info("Load partitions and files into file system view in 
advance. Paths: {}", subPartitionsToClean);
+          table.getHoodieView().loadPartitions(subPartitionsToClean);
+        }
         Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
             .map(subPartitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, 
earliestInstant)), cleanerParallelism)
             .stream()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 21d12333d87..48ec8f9baa1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -64,8 +64,6 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
-
 /**
  * Cleaner is responsible for garbage collecting older files in a given 
partition path. Such that
  * <p>
@@ -108,14 +106,9 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
         .map(entry -> Pair.of(new 
HoodieFileGroupId(entry.getValue().getPartitionPath(), 
entry.getValue().getFileId()), entry.getValue()))
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
-    // load all partitions in advance if necessary.
-    if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(), 
config)) {
-      LOG.info("Load all partitions and files into file system view in 
advance.");
-      fileSystemView.loadAllPartitions();
-    }
-    // collect savepointed timestamps to be assist with incremental cleaning. 
For non-partitioned and metadata table, we may not need this.
-    this.savepointedTimestamps = hoodieTable.isMetadataTable() ? 
Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? 
hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
-        : Collections.EMPTY_LIST);
+    // collect savepointed timestamps to assist with incremental cleaning. For 
non-partitioned and metadata table, we may not need this.
+    this.savepointedTimestamps = hoodieTable.isMetadataTable() ? 
Collections.emptyList() : (hoodieTable.isPartitioned() ? new 
ArrayList<>(hoodieTable.getSavepointTimestamps())
+        : Collections.emptyList());
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 76750171fb5..cdac0eeeb20 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -801,11 +801,20 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   @Override
-  public Void loadAllPartitions() {
+  public void loadAllPartitions() {
     try {
       readLock.lock();
       ensureAllPartitionsLoadedCorrectly();
-      return null;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void loadPartitions(List<String> partitionPaths) {
+    try {
+      readLock.lock();
+      ensurePartitionsLoadedCorrectly(partitionPaths);
     } finally {
       readLock.unlock();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 56d7c7cc25c..1e4b1852d1b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -168,8 +168,29 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
   }
 
   @Override
-  public Void loadAllPartitions() {
-    return execute(preferredView::loadAllPartitions, 
secondaryView::loadAllPartitions);
+  public void loadAllPartitions() {
+    execute(
+        () -> {
+          preferredView.loadAllPartitions();
+          return null;
+        },
+        () -> {
+          secondaryView.loadAllPartitions();
+          return null;
+        });
+  }
+
+  @Override
+  public void loadPartitions(List<String> partitionPaths) {
+    execute(
+        () -> {
+          preferredView.loadPartitions(partitionPaths);
+          return null;
+        },
+        () -> {
+          secondaryView.loadPartitions(partitionPaths);
+          return null;
+        });
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 4363a7daf27..61c90c6eb02 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -127,8 +127,10 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   // POST Requests
   public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, 
"refresh/");
   public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", 
BASE_URL, "loadallpartitions/");
+  public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", 
BASE_URL, "loadpartitions/");
 
   public static final String PARTITION_PARAM = "partition";
+  public static final String PARTITIONS_PARAM = "partitions";
   public static final String BASEPATH_PARAM = "basepath";
   public static final String INSTANT_PARAM = "instant";
   public static final String MAX_INSTANT_PARAM = "maxinstant";
@@ -526,11 +528,21 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   }
 
   @Override
-  public Void loadAllPartitions() {
+  public void loadAllPartitions() {
     Map<String, String> paramsMap = getParams();
     try {
       executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, 
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
-      return null;
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  @Override
+  public void loadPartitions(List<String> partitionPaths) {
+    try {
+      Map<String, String> paramsMap = getParams();
+      paramsMap.put(PARTITIONS_PARAM, 
OBJECT_MAPPER.writeValueAsString(partitionPaths));
+      executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, 
RequestMethod.POST);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 1bcd1de61bc..87b3db142e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -246,5 +246,11 @@ public interface TableFileSystemView {
   /**
    * Load all partition and file slices into view
    */
-  Void loadAllPartitions();
+  void loadAllPartitions();
+
+  /**
+   * Load all partition and file slices into view for the provided partition 
paths
+   * @param partitionPaths List of partition paths to load
+   */
+  void loadPartitions(List<String> partitionPaths);
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 1ab824724aa..165a565da22 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -633,6 +633,61 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order 
check");
   }
 
+  @Test
+  void testLoadPartitions_unPartitioned() throws Exception {
+    String partitionPath = "";
+    Paths.get(basePath, partitionPath).toFile().mkdirs();
+    String fileId = UUID.randomUUID().toString();
+
+    String instantTime1 = "1";
+    String fileName1 =
+        FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, 
instantTime1, 0, TEST_WRITE_TOKEN);
+
+    Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile();
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant1 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, instantTime1);
+
+    saveAsComplete(commitTimeline, instant1, Option.empty());
+    refreshFsView();
+
+    // Assert that no base files are returned without the partitions being 
loaded
+    assertEquals(0, 
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+    // Assert that load does not fail for un-partitioned tables
+    fsView.loadPartitions(Collections.singletonList(partitionPath));
+    // Assert that base files are returned after the empty-string partition is 
loaded
+    assertEquals(1, 
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+  }
+
+  @Test
+  void testLoadPartitions_partitioned() throws Exception {
+    String partitionPath1 = "2016/05/01";
+    String partitionPath2 = "2016/05/02";
+    Paths.get(basePath, partitionPath1).toFile().mkdirs();
+    Paths.get(basePath, partitionPath2).toFile().mkdirs();
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String instantTime1 = "1";
+    String fileName1 =
+        FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, 
instantTime1, 0, TEST_WRITE_TOKEN);
+    String fileName2 =
+        FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, 
instantTime1, 0, TEST_WRITE_TOKEN);
+
+    Paths.get(basePath, partitionPath1, fileName1).toFile().createNewFile();
+    Paths.get(basePath, partitionPath2, fileName2).toFile().createNewFile();
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant1 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, instantTime1);
+
+    saveAsComplete(commitTimeline, instant1, Option.empty());
+    refreshFsView();
+
+    // Assert that no base files are returned without the partitions being 
loaded
+    assertEquals(0, 
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+    // Only load a single partition path
+    fsView.loadPartitions(Collections.singletonList(partitionPath1));
+    // Assert that base file is returned for partitionPath1 only
+    assertEquals(1, 
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+  }
+
   /**
    * Returns all file-slices including uncommitted ones.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
index b297d320c7a..1e2b8e0c35e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
@@ -53,6 +53,9 @@ import java.util.stream.Stream;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -698,6 +701,27 @@ public class TestPriorityBasedFileSystemView {
     });
   }
 
+  @Test
+  public void testLoadPartitions() {
+    String partitionPath = "/table2";
+
+    fsView.loadPartitions(Collections.singletonList(partitionPath));
+    verify(primary, 
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+    verify(secondary, never()).loadPartitions(any());
+
+    resetMocks();
+    doThrow(new 
RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath));
+    fsView.loadPartitions(Collections.singletonList(partitionPath));
+    verify(primary, 
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+    verify(secondary, 
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+
+    resetMocks();
+    doThrow(new 
RuntimeException()).when(secondary).loadPartitions(Collections.singletonList(partitionPath));
+    assertThrows(RuntimeException.class, () -> {
+      fsView.loadPartitions(Collections.singletonList(partitionPath));
+    });
+  }
+
   @Test
   public void testGetPreferredView() {
     assertEquals(primary, fsView.getPreferredView());
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index c72491341fe..f17c5624084 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -37,6 +37,7 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
 import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
 import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
@@ -44,6 +45,7 @@ import 
org.apache.hudi.timeline.service.handlers.MarkerHandler;
 import org.apache.hudi.timeline.service.handlers.TimelineHandler;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import io.javalin.Javalin;
@@ -72,6 +74,7 @@ public class RequestHandler {
 
   private static final ObjectMapper OBJECT_MAPPER = new 
ObjectMapper().registerModule(new AfterburnerModule());
   private static final Logger LOG = 
LoggerFactory.getLogger(RequestHandler.class);
+  private static final TypeReference<List<String>> LIST_TYPE_REFERENCE = new 
TypeReference<List<String>>() {};
 
   private final TimelineService.Config timelineServiceConfig;
   private final FileSystemViewManager viewManager;
@@ -444,6 +447,19 @@ public class RequestHandler {
       writeValueAsString(ctx, success);
     }, false));
 
+    app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("LOAD_PARTITIONS", 1);
+      String basePath = 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+      try {
+        List<String> partitionPaths = 
OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM,
 String.class)
+            .getOrThrow(e -> new HoodieException("Partitions param is 
invalid")), LIST_TYPE_REFERENCE);
+        boolean success = sliceHandler.loadPartitions(basePath, 
partitionPaths);
+        writeValueAsString(ctx, success);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse request parameter", e);
+      }
+    }, false));
+
     app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
       boolean success = sliceHandler
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 4a4226724f8..391145c5cf8 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -163,4 +163,9 @@ public class FileSliceHandler extends Handler {
     viewManager.getFileSystemView(basePath).loadAllPartitions();
     return true;
   }
+
+  public boolean loadPartitions(String basePath, List<String> partitionPaths) {
+    viewManager.getFileSystemView(basePath).loadPartitions(partitionPaths);
+    return true;
+  }
 }

Reply via email to