This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 28f67ff3561 [HUDI-7551] Avoid loading all partitions in CleanPlanner
when MDT is enabled (#10928)
28f67ff3561 is described below
commit 28f67ff3561e04d9ee26d21930e5f928813e623a
Author: Tim Brown <[email protected]>
AuthorDate: Wed Mar 27 19:30:25 2024 -0500
[HUDI-7551] Avoid loading all partitions in CleanPlanner when MDT is
enabled (#10928)
---
.../action/clean/CleanPlanActionExecutor.java | 6 +++
.../hudi/table/action/clean/CleanPlanner.java | 13 ++---
.../table/view/AbstractTableFileSystemView.java | 13 ++++-
.../table/view/PriorityBasedFileSystemView.java | 25 +++++++++-
.../view/RemoteHoodieTableFileSystemView.java | 16 ++++++-
.../common/table/view/TableFileSystemView.java | 8 +++-
.../table/view/TestHoodieTableFileSystemView.java | 55 ++++++++++++++++++++++
.../view/TestPriorityBasedFileSystemView.java | 24 ++++++++++
.../hudi/timeline/service/RequestHandler.java | 16 +++++++
.../service/handlers/FileSliceHandler.java | 5 ++
10 files changed, 164 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 723a95bb218..77c96b47f05 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
@@ -122,10 +123,15 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
List<String> partitionsToDelete = new ArrayList<>();
+ boolean shouldUseBatchLookup =
shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config);
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
// Handles at most 'cleanerParallelism' number of partitions once at a
time to avoid overlarge memory pressure to the timeline server
// (remote or local embedded), thus to reduce the risk of an OOM
exception.
List<String> subPartitionsToClean = partitionsToClean.subList(i,
Math.min(i + cleanerParallelism, partitionsToClean.size()));
+ if (shouldUseBatchLookup) {
+ LOG.info("Load partitions and files into file system view in
advance. Paths: {}", subPartitionsToClean);
+ table.getHoodieView().loadPartitions(subPartitionsToClean);
+ }
Map<String, Pair<Boolean, List<CleanFileInfo>>>
cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean,
earliestInstant)), cleanerParallelism)
.stream()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 21d12333d87..48ec8f9baa1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -64,8 +64,6 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
-
/**
* Cleaner is responsible for garbage collecting older files in a given
partition path. Such that
* <p>
@@ -108,14 +106,9 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
.map(entry -> Pair.of(new
HoodieFileGroupId(entry.getValue().getPartitionPath(),
entry.getValue().getFileId()), entry.getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- // load all partitions in advance if necessary.
- if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(),
config)) {
- LOG.info("Load all partitions and files into file system view in
advance.");
- fileSystemView.loadAllPartitions();
- }
- // collect savepointed timestamps to be assist with incremental cleaning.
For non-partitioned and metadata table, we may not need this.
- this.savepointedTimestamps = hoodieTable.isMetadataTable() ?
Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ?
hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
- : Collections.EMPTY_LIST);
+ // collect savepointed timestamps to assist with incremental cleaning. For
non-partitioned and metadata table, we may not need this.
+ this.savepointedTimestamps = hoodieTable.isMetadataTable() ?
Collections.emptyList() : (hoodieTable.isPartitioned() ? new
ArrayList<>(hoodieTable.getSavepointTimestamps())
+ : Collections.emptyList());
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 76750171fb5..cdac0eeeb20 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -801,11 +801,20 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
@Override
- public Void loadAllPartitions() {
+ public void loadAllPartitions() {
try {
readLock.lock();
ensureAllPartitionsLoadedCorrectly();
- return null;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void loadPartitions(List<String> partitionPaths) {
+ try {
+ readLock.lock();
+ ensurePartitionsLoadedCorrectly(partitionPaths);
} finally {
readLock.unlock();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 56d7c7cc25c..1e4b1852d1b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -168,8 +168,29 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
}
@Override
- public Void loadAllPartitions() {
- return execute(preferredView::loadAllPartitions,
secondaryView::loadAllPartitions);
+ public void loadAllPartitions() {
+ execute(
+ () -> {
+ preferredView.loadAllPartitions();
+ return null;
+ },
+ () -> {
+ secondaryView.loadAllPartitions();
+ return null;
+ });
+ }
+
+ @Override
+ public void loadPartitions(List<String> partitionPaths) {
+ execute(
+ () -> {
+ preferredView.loadPartitions(partitionPaths);
+ return null;
+ },
+ () -> {
+ secondaryView.loadPartitions(partitionPaths);
+ return null;
+ });
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 4363a7daf27..61c90c6eb02 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -127,8 +127,10 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
// POST Requests
public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL,
"refresh/");
public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s",
BASE_URL, "loadallpartitions/");
+ public static final String LOAD_PARTITIONS_URL = String.format("%s/%s",
BASE_URL, "loadpartitions/");
public static final String PARTITION_PARAM = "partition";
+ public static final String PARTITIONS_PARAM = "partitions";
public static final String BASEPATH_PARAM = "basepath";
public static final String INSTANT_PARAM = "instant";
public static final String MAX_INSTANT_PARAM = "maxinstant";
@@ -526,11 +528,21 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
@Override
- public Void loadAllPartitions() {
+ public void loadAllPartitions() {
Map<String, String> paramsMap = getParams();
try {
executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap,
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
- return null;
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
+ @Override
+ public void loadPartitions(List<String> partitionPaths) {
+ try {
+ Map<String, String> paramsMap = getParams();
+ paramsMap.put(PARTITIONS_PARAM,
OBJECT_MAPPER.writeValueAsString(partitionPaths));
+ executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE,
RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 1bcd1de61bc..87b3db142e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -246,5 +246,11 @@ public interface TableFileSystemView {
/**
* Load all partition and file slices into view
*/
- Void loadAllPartitions();
+ void loadAllPartitions();
+
+ /**
+ * Load all partition and file slices into view for the provided partition
paths
+ * @param partitionPaths List of partition paths to load
+ */
+ void loadPartitions(List<String> partitionPaths);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 1ab824724aa..165a565da22 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -633,6 +633,61 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order
check");
}
+ @Test
+ void testLoadPartitions_unPartitioned() throws Exception {
+ String partitionPath = "";
+ Paths.get(basePath, partitionPath).toFile().mkdirs();
+ String fileId = UUID.randomUUID().toString();
+
+ String instantTime1 = "1";
+ String fileName1 =
+ FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
instantTime1, 0, TEST_WRITE_TOKEN);
+
+ Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile();
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ refreshFsView();
+
+ // Assert that no base files are returned without the partitions being
loaded
+ assertEquals(0,
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+ // Assert that load does not fail for un-partitioned tables
+ fsView.loadPartitions(Collections.singletonList(partitionPath));
+ // Assert that base files are returned after the empty-string partition is
loaded
+ assertEquals(1,
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+ }
+
+ @Test
+ void testLoadPartitions_partitioned() throws Exception {
+ String partitionPath1 = "2016/05/01";
+ String partitionPath2 = "2016/05/02";
+ Paths.get(basePath, partitionPath1).toFile().mkdirs();
+ Paths.get(basePath, partitionPath2).toFile().mkdirs();
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String instantTime1 = "1";
+ String fileName1 =
+ FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
instantTime1, 0, TEST_WRITE_TOKEN);
+ String fileName2 =
+ FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION,
instantTime1, 0, TEST_WRITE_TOKEN);
+
+ Paths.get(basePath, partitionPath1, fileName1).toFile().createNewFile();
+ Paths.get(basePath, partitionPath2, fileName2).toFile().createNewFile();
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ refreshFsView();
+
+ // Assert that no base files are returned without the partitions being
loaded
+ assertEquals(0,
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+ // Only load a single partition path
+ fsView.loadPartitions(Collections.singletonList(partitionPath1));
+ // Assert that base file is returned for partitionPath1 only
+ assertEquals(1,
fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
+ }
+
/**
* Returns all file-slices including uncommitted ones.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
index b297d320c7a..1e2b8e0c35e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
@@ -53,6 +53,9 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -698,6 +701,27 @@ public class TestPriorityBasedFileSystemView {
});
}
+ @Test
+ public void testLoadPartitions() {
+ String partitionPath = "/table2";
+
+ fsView.loadPartitions(Collections.singletonList(partitionPath));
+ verify(primary,
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+ verify(secondary, never()).loadPartitions(any());
+
+ resetMocks();
+ doThrow(new
RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath));
+ fsView.loadPartitions(Collections.singletonList(partitionPath));
+ verify(primary,
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+ verify(secondary,
times(1)).loadPartitions(Collections.singletonList(partitionPath));
+
+ resetMocks();
+ doThrow(new
RuntimeException()).when(secondary).loadPartitions(Collections.singletonList(partitionPath));
+ assertThrows(RuntimeException.class, () -> {
+ fsView.loadPartitions(Collections.singletonList(partitionPath));
+ });
+ }
+
@Test
public void testGetPreferredView() {
assertEquals(primary, fsView.getPreferredView());
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index c72491341fe..f17c5624084 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -37,6 +37,7 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
@@ -44,6 +45,7 @@ import
org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import io.javalin.Javalin;
@@ -72,6 +74,7 @@ public class RequestHandler {
private static final ObjectMapper OBJECT_MAPPER = new
ObjectMapper().registerModule(new AfterburnerModule());
private static final Logger LOG =
LoggerFactory.getLogger(RequestHandler.class);
+ private static final TypeReference<List<String>> LIST_TYPE_REFERENCE = new
TypeReference<List<String>>() {};
private final TimelineService.Config timelineServiceConfig;
private final FileSystemViewManager viewManager;
@@ -444,6 +447,19 @@ public class RequestHandler {
writeValueAsString(ctx, success);
}, false));
+ app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("LOAD_PARTITIONS", 1);
+ String basePath =
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+ try {
+ List<String> partitionPaths =
OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM,
String.class)
+ .getOrThrow(e -> new HoodieException("Partitions param is
invalid")), LIST_TYPE_REFERENCE);
+ boolean success = sliceHandler.loadPartitions(basePath,
partitionPaths);
+ writeValueAsString(ctx, success);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse request parameter", e);
+ }
+ }, false));
+
app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
boolean success = sliceHandler
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 4a4226724f8..391145c5cf8 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -163,4 +163,9 @@ public class FileSliceHandler extends Handler {
viewManager.getFileSystemView(basePath).loadAllPartitions();
return true;
}
+
+ public boolean loadPartitions(String basePath, List<String> partitionPaths) {
+ viewManager.getFileSystemView(basePath).loadPartitions(partitionPaths);
+ return true;
+ }
}