This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fe43e6f85d [HUDI-5253] HoodieMergeOnReadTableInputFormat could have 
duplicate records issue if it contains delta files while still splittable 
(#7264)
fe43e6f85d is described below

commit fe43e6f85d6d98db43b4fdc144654a07db032a28
Author: RexAn <[email protected]>
AuthorDate: Tue Nov 29 20:51:07 2022 +0800

    [HUDI-5253] HoodieMergeOnReadTableInputFormat could have duplicate records 
issue if it contains delta files while still splittable (#7264)
---
 .../functional/TestHoodieClientOnMergeOnReadStorage.java    |  1 -
 .../org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java |  2 +-
 .../realtime/TestHoodieMergeOnReadTableInputFormat.java     | 13 +++++++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index fd2245d34f..1def851949 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -168,7 +168,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
     client.compact(compactionTimeStamp.get());
 
     prevCommitTime = compactionTimeStamp.get();
-    //TODO: Below commits are creating duplicates when all the tests are run 
together. but individually they are passing.
     for (int i = 0; i < 2; i++) {
       // Upsert
       newCommitTime = HoodieActiveTimeline.createNewInstantTime();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
index 5ba62113a8..37b59a9627 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
@@ -89,7 +89,7 @@ public class HoodieRealtimePath extends Path {
   }
 
   public boolean isSplitable() {
-    return !toString().contains(".log") && !includeBootstrapFilePath();
+    return !toString().contains(".log") && deltaLogFiles.isEmpty() && 
!includeBootstrapFilePath();
   }
 
   public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
index d44f5fbf63..6a5404762a 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.hadoop.realtime;
 
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.PathWithBootstrapFileStatus;
 
@@ -65,4 +66,16 @@ public class TestHoodieMergeOnReadTableInputFormat {
     rtPath.setPathWithBootstrapFileStatus(path);
     assertFalse(new HoodieMergeOnReadTableInputFormat().isSplitable(fs, 
rtPath), "Path for bootstrap should not be splitable.");
   }
+
+  @Test
+  void pathNotSplitableIfContainsDeltaFiles() throws IOException {
+    URI basePath = Files.createTempFile(tempDir, "target", ".parquet").toUri();
+    HoodieRealtimePath rtPath = new HoodieRealtimePath(new Path("foo"), "bar", 
basePath.toString(), Collections.emptyList(), "000", false, Option.empty());
+    assertTrue(new HoodieMergeOnReadTableInputFormat().isSplitable(fs, 
rtPath), "Path only contains the base file should be splittable");
+
+    URI logPath = Files.createTempFile(tempDir, ".test", 
".log.4_1-149-180").toUri();
+    HoodieLogFile logFile = new HoodieLogFile(fs.getFileStatus(new 
Path(logPath)));
+    rtPath = new HoodieRealtimePath(new Path("foo"), "bar", 
basePath.toString(), Collections.singletonList(logFile), "000", false, 
Option.empty());
+    assertFalse(new HoodieMergeOnReadTableInputFormat().isSplitable(fs, 
rtPath), "Path contains log files should not be splittable.");
+  }
 }

Reply via email to