This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6a5a2f96e6a branch-3.1: [fix](hudi) fix hudi incr query and support 
"latest" #55003 (#55232)
6a5a2f96e6a is described below

commit 6a5a2f96e6afd8724ad5264a747035abef0625aa
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Aug 24 15:18:17 2025 -0700

    branch-3.1: [fix](hudi) fix hudi incr query and support "latest" #55003 
(#55232)
    
    bp #55003
    
    ---------
    
    Co-authored-by: Socrates <[email protected]>
    Co-authored-by: Socrates <[email protected]>
---
 .../hudi/source/COWIncrementalRelation.java        |  31 ++++++++-------------
 .../hudi/source/EmptyIncrementalRelation.java      |   5 ----
 .../hudi/source/IncrementalRelation.java           |   3 +-
 .../hudi/source/MORIncrementalRelation.java        |  31 +++++++--------------
 .../trees/plans/logical/LogicalHudiScan.java       |   7 ++---
 .../hudi/test_hudi_incremental.out                 | Bin 5687 -> 7715 bytes
 .../hudi/test_hudi_incremental.groovy              |   2 ++
 7 files changed, 26 insertions(+), 53 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 3a72c1bacab..b5989cc786d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -62,7 +62,6 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
     private final Collection<String> filteredRegularFullPaths;
     private final Collection<String> filteredMetaBootstrapFullPaths;
 
-    private final boolean includeStartTime;
     private final String startTs;
     private final String endTs;
 
@@ -90,10 +89,15 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         if (EARLIEST_TIME.equals(startInstantTime)) {
             startInstantTime = "000";
         }
-        String endInstantTime = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
-                hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
-                        ? 
commitTimeline.lastInstant().get().getCompletionTime()
-                        : commitTimeline.lastInstant().get().requestedTime());
+
+        String latestTime = hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
+                ? commitTimeline.lastInstant().get().getCompletionTime()
+                : commitTimeline.lastInstant().get().requestedTime();
+        String endInstantTime = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime", latestTime);
+        if (LATEST_TIME.equals(endInstantTime)) {
+            endInstantTime = latestTime;
+        }
+
         startInstantArchived = 
commitTimeline.isBeforeTimelineStarts(startInstantTime);
         endInstantArchived = 
commitTimeline.isBeforeTimelineStarts(endInstantTime);
 
@@ -162,14 +166,8 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
 
         fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration);
         fullTableScan = shouldFullTableScan();
-        includeStartTime = !fullTableScan;
-        if (fullTableScan || commitsToReturn.isEmpty()) {
-            startTs = startInstantTime;
-            endTs = endInstantTime;
-        } else {
-            startTs = commitsToReturn.get(0).requestedTime();
-            endTs = commitsToReturn.get(commitsToReturn.size() - 
1).requestedTime();
-        }
+        startTs = startInstantTime;
+        endTs = endInstantTime;
     }
 
     private boolean shouldFullTableScan() throws HoodieException, IOException {
@@ -235,10 +233,8 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
 
     @Override
     public Map<String, String> getHoodieParams() {
-        optParams.put("hoodie.datasource.read.incr.operation", "true");
         optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
         optParams.put("hoodie.datasource.read.end.instanttime", endTs);
-        optParams.put("hoodie.datasource.read.incr.includeStartTime", 
includeStartTime ? "true" : "false");
         return optParams;
     }
 
@@ -247,11 +243,6 @@ public class COWIncrementalRelation implements 
IncrementalRelation {
         return fullTableScan;
     }
 
-    @Override
-    public boolean isIncludeStartTime() {
-        return includeStartTime;
-    }
-
     @Override
     public String getStartTs() {
         return startTs;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
index c483bc46cfc..666db958eee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
@@ -59,11 +59,6 @@ public class EmptyIncrementalRelation implements 
IncrementalRelation {
         return false;
     }
 
-    @Override
-    public boolean isIncludeStartTime() {
-        return true;
-    }
-
     @Override
     public String getStartTs() {
         return EMPTY_TS;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
index 4a707064fb6..cf7f47c722e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 public interface IncrementalRelation {
     public static String EARLIEST_TIME = "earliest";
+    public static String LATEST_TIME = "latest";
 
     List<FileSlice> collectFileSlices() throws HoodieException;
 
@@ -36,8 +37,6 @@ public interface IncrementalRelation {
 
     boolean fallbackFullTableScan();
 
-    boolean isIncludeStartTime();
-
     String getStartTs();
 
     String getEndTs();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index 69ca39e9ad6..5b16186e10d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -49,7 +49,7 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
     private final HoodieTimeline timeline;
     private final HollowCommitHandling hollowCommitHandling;
     private String startTimestamp;
-    private final String endTimestamp;
+    private String endTimestamp;
     private final boolean startInstantArchived;
     private final boolean endInstantArchived;
     private final List<HoodieInstant> includedCommits;
@@ -57,7 +57,6 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
     private final List<StoragePathInfo> affectedFilesInCommits;
     private final boolean fullTableScan;
     private final String globPattern;
-    private final boolean includeStartTime;
     private final String startTs;
     private final String endTs;
 
@@ -85,10 +84,14 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         if (EARLIEST_TIME.equals(startTimestamp)) {
             startTimestamp = "000";
         }
-        endTimestamp = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
-                hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
+
+        String latestTime = hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
                         ? timeline.lastInstant().get().getCompletionTime()
-                        : timeline.lastInstant().get().requestedTime());
+                : timeline.lastInstant().get().requestedTime();
+        endTimestamp = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime", latestTime);
+        if (LATEST_TIME.equals(latestTime)) {
+            endTimestamp = latestTime;
+        }
 
         startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
         endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);
@@ -103,23 +106,14 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         }
         globPattern = 
optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");
 
-        if (startInstantArchived) {
-            includeStartTime = false;
-            startTs = startTimestamp;
-        } else {
-            includeStartTime = true;
-            startTs = includedCommits.isEmpty() ? startTimestamp : 
includedCommits.get(0).requestedTime();
-        }
-        endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
-                : includedCommits.get(includedCommits.size() - 
1).requestedTime();
+        startTs = startTimestamp;
+        endTs = endTimestamp;
     }
 
     @Override
     public Map<String, String> getHoodieParams() {
-        optParams.put("hoodie.datasource.read.incr.operation", "true");
         optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
         optParams.put("hoodie.datasource.read.end.instanttime", endTs);
-        optParams.put("hoodie.datasource.read.incr.includeStartTime", 
includeStartTime ? "true" : "false");
         return optParams;
     }
 
@@ -165,11 +159,6 @@ public class MORIncrementalRelation implements 
IncrementalRelation {
         return fullTableScan;
     }
 
-    @Override
-    public boolean isIncludeStartTime() {
-        return includeStartTime;
-    }
-
     @Override
     public String getStartTs() {
         return startTs;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index e0235e38661..f7ff307f4d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -33,7 +33,6 @@ import org.apache.doris.nereids.trees.TableSample;
 import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.GreaterThan;
-import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
 import org.apache.doris.nereids.trees.expressions.LessThanEqual;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -95,7 +94,7 @@ public class LogicalHudiScan extends LogicalFileScan {
     /**
      * replace incremental params as AND expression
      * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
-     * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= 
'20240308110677278'
+     * _hoodie_commit_time > 20240308110257169 and _hoodie_commit_time <= 
'20240308110677278'
      */
     public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
         if (!incrementalRelation.isPresent()) {
@@ -114,9 +113,7 @@ public class LogicalHudiScan extends LogicalFileScan {
         StringLiteral upperValue = new 
StringLiteral(incrementalRelation.get().getEndTs());
         StringLiteral lowerValue = new 
StringLiteral(incrementalRelation.get().getStartTs());
         ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
-        ComparisonPredicate great = 
incrementalRelation.get().isIncludeStartTime()
-                ? new GreaterThanEqual(timeField, lowerValue)
-                : new GreaterThan(timeField, lowerValue);
+        ComparisonPredicate great = new GreaterThan(timeField, lowerValue);
         return ImmutableSet.of(great, less);
     }
 
diff --git 
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
index 50644f34961..df7583fe9ca 100644
Binary files 
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out and 
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out differ
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index b962936dca1..62a34fc04e3 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -39,6 +39,8 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
         timestamps.eachWithIndex { timestamp, index ->
             def query_name = "qt_incremental_${index + 1}_end"
             "${query_name}" """ select count(user_id) from 
${table_name}@incr('beginTime' = '${timestamp}'); """
+            query_name = "qt_incremental_${index + 1}_latest"
+            "${query_name}" """ select count(user_id) from 
${table_name}@incr('beginTime' = '${timestamp}', 'endTime' = 'latest'); """
             query_name = "qt_incremental_earliest_${index + 1}"
             "${query_name}" """ select count(user_id) from 
${table_name}@incr('beginTime' = 'earliest', 'endTime' = '${timestamp}'); """
             if (index > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to