This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fe43e6f85d [HUDI-5253] HoodieMergeOnReadTableInputFormat could have
duplicate records issue if it contains delta files while still splittable
(#7264)
fe43e6f85d is described below
commit fe43e6f85d6d98db43b4fdc144654a07db032a28
Author: RexAn <[email protected]>
AuthorDate: Tue Nov 29 20:51:07 2022 +0800
[HUDI-5253] HoodieMergeOnReadTableInputFormat could have duplicate records
issue if it contains delta files while still splittable (#7264)
---
.../functional/TestHoodieClientOnMergeOnReadStorage.java | 1 -
.../org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java | 2 +-
.../realtime/TestHoodieMergeOnReadTableInputFormat.java | 13 +++++++++++++
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index fd2245d34f..1def851949 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -168,7 +168,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
client.compact(compactionTimeStamp.get());
prevCommitTime = compactionTimeStamp.get();
- //TODO: Below commits are creating duplicates when all the tests are run
together. but individually they are passing.
for (int i = 0; i < 2; i++) {
// Upsert
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
index 5ba62113a8..37b59a9627 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimePath.java
@@ -89,7 +89,7 @@ public class HoodieRealtimePath extends Path {
}
public boolean isSplitable() {
- return !toString().contains(".log") && !includeBootstrapFilePath();
+ return !toString().contains(".log") && deltaLogFiles.isEmpty() &&
!includeBootstrapFilePath();
}
public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
index d44f5fbf63..6a5404762a 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadTableInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.PathWithBootstrapFileStatus;
@@ -65,4 +66,16 @@ public class TestHoodieMergeOnReadTableInputFormat {
rtPath.setPathWithBootstrapFileStatus(path);
assertFalse(new HoodieMergeOnReadTableInputFormat().isSplitable(fs,
rtPath), "Path for bootstrap should not be splitable.");
}
+
+ @Test
+ void pathNotSplitableIfContainsDeltaFiles() throws IOException {
+ URI basePath = Files.createTempFile(tempDir, "target", ".parquet").toUri();
+ HoodieRealtimePath rtPath = new HoodieRealtimePath(new Path("foo"), "bar",
basePath.toString(), Collections.emptyList(), "000", false, Option.empty());
+ assertTrue(new HoodieMergeOnReadTableInputFormat().isSplitable(fs,
rtPath), "Path only contains the base file should be splittable");
+
+ URI logPath = Files.createTempFile(tempDir, ".test",
".log.4_1-149-180").toUri();
+ HoodieLogFile logFile = new HoodieLogFile(fs.getFileStatus(new
Path(logPath)));
+ rtPath = new HoodieRealtimePath(new Path("foo"), "bar",
basePath.toString(), Collections.singletonList(logFile), "000", false,
Option.empty());
+ assertFalse(new HoodieMergeOnReadTableInputFormat().isSplitable(fs,
rtPath), "Path contains log files should not be splittable.");
+ }
}