This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 895957db860 [HUDI-8632] Fix Incorrect classification of input paths in 
InputPathHandler and HoodieInputFormatUtils (#12495)
895957db860 is described below

commit 895957db860108b0e2f2a05c99d27375d3965d2e
Author: Nikolay Skovorodin <[email protected]>
AuthorDate: Wed Dec 18 15:34:12 2024 +0700

    [HUDI-8632] Fix Incorrect classification of input paths in InputPathHandler 
and HoodieInputFormatUtils (#12495)
---
 .../org/apache/hudi/hadoop/InputPathHandler.java   |  4 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  | 47 ++++------------------
 .../apache/hudi/hadoop/TestInputPathHandler.java   |  7 ++++
 3 files changed, 17 insertions(+), 41 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index 5342f0b3e9c..f0c2e6a1fe2 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -94,8 +94,10 @@ public class InputPathHandler {
       throws IOException {
     for (Path inputPath : inputPaths) {
       boolean basePathKnown = false;
+      String inputPathStr = inputPath.toString();
       for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) {
-        if 
(inputPath.toString().contains(metaClient.getBasePath().toString())) {
+        String basePathStr = metaClient.getBasePath().toString();
+        if (inputPathStr.equals(basePathStr) || 
inputPathStr.startsWith(basePathStr + "/")) {
           // We already know the base path for this inputPath.
           basePathKnown = true;
           // Check if this is for a snapshot query
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index c3e9a8504d0..aa24bd36fd9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -436,44 +436,6 @@ public class HoodieInputFormatUtils {
     return returns;
   }
 
-  /**
-   * Takes in a list of filesStatus and a list of table metadata. Groups the 
files status list
-   * based on given table metadata.
-   *
-   * @param fileStatuses
-   * @param fileExtension
-   * @param metaClientList
-   * @return
-   * @throws IOException
-   */
-  public static Map<HoodieTableMetaClient, List<FileStatus>> 
groupFileStatusForSnapshotPaths(
-      FileStatus[] fileStatuses, String fileExtension, 
Collection<HoodieTableMetaClient> metaClientList) {
-    // This assumes the paths for different tables are grouped together
-    Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
-    HoodieTableMetaClient metadata = null;
-    for (FileStatus status : fileStatuses) {
-      Path inputPath = status.getPath();
-      if (!inputPath.getName().endsWith(fileExtension)) {
-        //FIXME(vc): skip non data files for now. This wont be needed once log 
file name start
-        // with "."
-        continue;
-      }
-      if ((metadata == null) || 
(!inputPath.toString().contains(metadata.getBasePath().toString()))) {
-        for (HoodieTableMetaClient metaClient : metaClientList) {
-          if 
(inputPath.toString().contains(metaClient.getBasePath().toString())) {
-            metadata = metaClient;
-            if (!grouped.containsKey(metadata)) {
-              grouped.put(metadata, new ArrayList<>());
-            }
-            break;
-          }
-        }
-      }
-      grouped.get(metadata).add(status);
-    }
-    return grouped;
-  }
-
   public static Map<HoodieTableMetaClient, List<Path>> 
groupSnapshotPathsByMetaClient(
       Collection<HoodieTableMetaClient> metaClientList,
       List<Path> snapshotPaths
@@ -481,9 +443,14 @@ public class HoodieInputFormatUtils {
     Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
     metaClientList.forEach(metaClient -> grouped.put(metaClient, new 
ArrayList<>()));
     for (Path path : snapshotPaths) {
+      String inputPathStr = path.toString();
       // Find meta client associated with the input path
-      metaClientList.stream().filter(metaClient -> 
path.toString().contains(metaClient.getBasePath().toString()))
-          .forEach(metaClient -> grouped.get(metaClient).add(path));
+      Option<HoodieTableMetaClient> matchedMetaClient = 
Option.fromJavaOptional(metaClientList.stream()
+          .filter(metaClient -> {
+            String basePathStr = metaClient.getBasePath().toString();
+            return inputPathStr.equals(basePathStr) || 
inputPathStr.startsWith(basePathStr + "/"); })
+          .findFirst());
+      matchedMetaClient.ifPresent(metaClient -> 
grouped.get(metaClient).add(path));
     }
     return grouped;
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index e1bdc682932..2a9380b3214 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -54,6 +54,7 @@ public class TestInputPathHandler {
 
   // snapshot Table
   public static final String ETL_TRIPS_TEST_NAME = "etl_trips";
+  public static final String MODEL_TRIPS_COW_TEST_NAME = "model_trips_cow";
 
   // non Hoodie table
   public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
@@ -77,6 +78,7 @@ public class TestInputPathHandler {
   private static String basePathTable4 = null; // non hoodie Path
   private static String basePathTable5 = null;
   private static String basePathTable6 = null;
+  private static String basePathTable7 = null;
   private static List<String> incrementalTables;
   private static List<Path> incrementalPaths;
   private static List<Path> snapshotPaths;
@@ -123,6 +125,7 @@ public class TestInputPathHandler {
     String tempPath = "/tmp/";
     basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME;
     basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME;
+    basePathTable7 = 
parentPath.resolve(MODEL_TRIPS_COW_TEST_NAME).toAbsolutePath().toString();
 
     dfs.mkdirs(new Path(basePathTable1));
     initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, 
HoodieTableType.MERGE_ON_READ);
@@ -145,6 +148,10 @@ public class TestInputPathHandler {
     initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME, 
HoodieTableType.MERGE_ON_READ);
     incrementalPaths.add(new Path(basePathTable6));
 
+    dfs.mkdirs(new Path(basePathTable7));
+    initTableType(dfs.getConf(), basePathTable7, MODEL_TRIPS_COW_TEST_NAME, 
HoodieTableType.COPY_ON_WRITE);
+    snapshotPaths.addAll(generatePartitions(dfs, basePathTable7));
+
     inputPaths.addAll(incrementalPaths);
     inputPaths.addAll(snapshotPaths);
     inputPaths.addAll(nonHoodiePaths);

Reply via email to