This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 895957db860 [HUDI-8632] Fix Incorrect classification of input paths in
InputPathHandler and HoodieInputFormatUtils (#12495)
895957db860 is described below
commit 895957db860108b0e2f2a05c99d27375d3965d2e
Author: Nikolay Skovorodin <[email protected]>
AuthorDate: Wed Dec 18 15:34:12 2024 +0700
[HUDI-8632] Fix Incorrect classification of input paths in InputPathHandler
and HoodieInputFormatUtils (#12495)
---
.../org/apache/hudi/hadoop/InputPathHandler.java | 4 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 47 ++++------------------
.../apache/hudi/hadoop/TestInputPathHandler.java | 7 ++++
3 files changed, 17 insertions(+), 41 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index 5342f0b3e9c..f0c2e6a1fe2 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -94,8 +94,10 @@ public class InputPathHandler {
throws IOException {
for (Path inputPath : inputPaths) {
boolean basePathKnown = false;
+ String inputPathStr = inputPath.toString();
for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) {
- if
(inputPath.toString().contains(metaClient.getBasePath().toString())) {
+ String basePathStr = metaClient.getBasePath().toString();
+ if (inputPathStr.equals(basePathStr) ||
inputPathStr.startsWith(basePathStr + "/")) {
// We already know the base path for this inputPath.
basePathKnown = true;
// Check if this is for a snapshot query
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index c3e9a8504d0..aa24bd36fd9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -436,44 +436,6 @@ public class HoodieInputFormatUtils {
return returns;
}
- /**
- * Takes in a list of filesStatus and a list of table metadata. Groups the
files status list
- * based on given table metadata.
- *
- * @param fileStatuses
- * @param fileExtension
- * @param metaClientList
- * @return
- * @throws IOException
- */
- public static Map<HoodieTableMetaClient, List<FileStatus>>
groupFileStatusForSnapshotPaths(
- FileStatus[] fileStatuses, String fileExtension,
Collection<HoodieTableMetaClient> metaClientList) {
- // This assumes the paths for different tables are grouped together
- Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
- HoodieTableMetaClient metadata = null;
- for (FileStatus status : fileStatuses) {
- Path inputPath = status.getPath();
- if (!inputPath.getName().endsWith(fileExtension)) {
- //FIXME(vc): skip non data files for now. This wont be needed once log
file name start
- // with "."
- continue;
- }
- if ((metadata == null) ||
(!inputPath.toString().contains(metadata.getBasePath().toString()))) {
- for (HoodieTableMetaClient metaClient : metaClientList) {
- if
(inputPath.toString().contains(metaClient.getBasePath().toString())) {
- metadata = metaClient;
- if (!grouped.containsKey(metadata)) {
- grouped.put(metadata, new ArrayList<>());
- }
- break;
- }
- }
- }
- grouped.get(metadata).add(status);
- }
- return grouped;
- }
-
public static Map<HoodieTableMetaClient, List<Path>>
groupSnapshotPathsByMetaClient(
Collection<HoodieTableMetaClient> metaClientList,
List<Path> snapshotPaths
@@ -481,9 +443,14 @@ public class HoodieInputFormatUtils {
Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
metaClientList.forEach(metaClient -> grouped.put(metaClient, new
ArrayList<>()));
for (Path path : snapshotPaths) {
+ String inputPathStr = path.toString();
// Find meta client associated with the input path
- metaClientList.stream().filter(metaClient ->
path.toString().contains(metaClient.getBasePath().toString()))
- .forEach(metaClient -> grouped.get(metaClient).add(path));
+ Option<HoodieTableMetaClient> matchedMetaClient =
Option.fromJavaOptional(metaClientList.stream()
+ .filter(metaClient -> {
+ String basePathStr = metaClient.getBasePath().toString();
+ return inputPathStr.equals(basePathStr) ||
inputPathStr.startsWith(basePathStr + "/"); })
+ .findFirst());
+ matchedMetaClient.ifPresent(metaClient ->
grouped.get(metaClient).add(path));
}
return grouped;
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index e1bdc682932..2a9380b3214 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -54,6 +54,7 @@ public class TestInputPathHandler {
// snapshot Table
public static final String ETL_TRIPS_TEST_NAME = "etl_trips";
+ public static final String MODEL_TRIPS_COW_TEST_NAME = "model_trips_cow";
// non Hoodie table
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
@@ -77,6 +78,7 @@ public class TestInputPathHandler {
private static String basePathTable4 = null; // non hoodie Path
private static String basePathTable5 = null;
private static String basePathTable6 = null;
+ private static String basePathTable7 = null;
private static List<String> incrementalTables;
private static List<Path> incrementalPaths;
private static List<Path> snapshotPaths;
@@ -123,6 +125,7 @@ public class TestInputPathHandler {
String tempPath = "/tmp/";
basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME;
basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME;
+ basePathTable7 =
parentPath.resolve(MODEL_TRIPS_COW_TEST_NAME).toAbsolutePath().toString();
dfs.mkdirs(new Path(basePathTable1));
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME,
HoodieTableType.MERGE_ON_READ);
@@ -145,6 +148,10 @@ public class TestInputPathHandler {
initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME,
HoodieTableType.MERGE_ON_READ);
incrementalPaths.add(new Path(basePathTable6));
+ dfs.mkdirs(new Path(basePathTable7));
+ initTableType(dfs.getConf(), basePathTable7, MODEL_TRIPS_COW_TEST_NAME,
HoodieTableType.COPY_ON_WRITE);
+ snapshotPaths.addAll(generatePartitions(dfs, basePathTable7));
+
inputPaths.addAll(incrementalPaths);
inputPaths.addAll(snapshotPaths);
inputPaths.addAll(nonHoodiePaths);