This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6a5a2f96e6a branch-3.1: [fix](hudi) fix hudi incr query and support
"latest" #55003 (#55232)
6a5a2f96e6a is described below
commit 6a5a2f96e6afd8724ad5264a747035abef0625aa
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Aug 24 15:18:17 2025 -0700
branch-3.1: [fix](hudi) fix hudi incr query and support "latest" #55003
(#55232)
bp #55003
---------
Co-authored-by: Socrates <[email protected]>
Co-authored-by: Socrates <[email protected]>
---
.../hudi/source/COWIncrementalRelation.java | 31 ++++++++-------------
.../hudi/source/EmptyIncrementalRelation.java | 5 ----
.../hudi/source/IncrementalRelation.java | 3 +-
.../hudi/source/MORIncrementalRelation.java | 31 +++++++--------------
.../trees/plans/logical/LogicalHudiScan.java | 7 ++---
.../hudi/test_hudi_incremental.out | Bin 5687 -> 7715 bytes
.../hudi/test_hudi_incremental.groovy | 2 ++
7 files changed, 26 insertions(+), 53 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index 3a72c1bacab..b5989cc786d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -62,7 +62,6 @@ public class COWIncrementalRelation implements
IncrementalRelation {
private final Collection<String> filteredRegularFullPaths;
private final Collection<String> filteredMetaBootstrapFullPaths;
- private final boolean includeStartTime;
private final String startTs;
private final String endTs;
@@ -90,10 +89,15 @@ public class COWIncrementalRelation implements
IncrementalRelation {
if (EARLIEST_TIME.equals(startInstantTime)) {
startInstantTime = "000";
}
- String endInstantTime =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
- hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
- ?
commitTimeline.lastInstant().get().getCompletionTime()
- : commitTimeline.lastInstant().get().requestedTime());
+
+ String latestTime = hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
+ ? commitTimeline.lastInstant().get().getCompletionTime()
+ : commitTimeline.lastInstant().get().requestedTime();
+ String endInstantTime =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime", latestTime);
+ if (LATEST_TIME.equals(endInstantTime)) {
+ endInstantTime = latestTime;
+ }
+
startInstantArchived =
commitTimeline.isBeforeTimelineStarts(startInstantTime);
endInstantArchived =
commitTimeline.isBeforeTimelineStarts(endInstantTime);
@@ -162,14 +166,8 @@ public class COWIncrementalRelation implements
IncrementalRelation {
fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration);
fullTableScan = shouldFullTableScan();
- includeStartTime = !fullTableScan;
- if (fullTableScan || commitsToReturn.isEmpty()) {
- startTs = startInstantTime;
- endTs = endInstantTime;
- } else {
- startTs = commitsToReturn.get(0).requestedTime();
- endTs = commitsToReturn.get(commitsToReturn.size() -
1).requestedTime();
- }
+ startTs = startInstantTime;
+ endTs = endInstantTime;
}
private boolean shouldFullTableScan() throws HoodieException, IOException {
@@ -235,10 +233,8 @@ public class COWIncrementalRelation implements
IncrementalRelation {
@Override
public Map<String, String> getHoodieParams() {
- optParams.put("hoodie.datasource.read.incr.operation", "true");
optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
optParams.put("hoodie.datasource.read.end.instanttime", endTs);
- optParams.put("hoodie.datasource.read.incr.includeStartTime",
includeStartTime ? "true" : "false");
return optParams;
}
@@ -247,11 +243,6 @@ public class COWIncrementalRelation implements
IncrementalRelation {
return fullTableScan;
}
- @Override
- public boolean isIncludeStartTime() {
- return includeStartTime;
- }
-
@Override
public String getStartTs() {
return startTs;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
index c483bc46cfc..666db958eee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/EmptyIncrementalRelation.java
@@ -59,11 +59,6 @@ public class EmptyIncrementalRelation implements
IncrementalRelation {
return false;
}
- @Override
- public boolean isIncludeStartTime() {
- return true;
- }
-
@Override
public String getStartTs() {
return EMPTY_TS;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
index 4a707064fb6..cf7f47c722e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
@@ -27,6 +27,7 @@ import java.util.Map;
public interface IncrementalRelation {
public static String EARLIEST_TIME = "earliest";
+ public static String LATEST_TIME = "latest";
List<FileSlice> collectFileSlices() throws HoodieException;
@@ -36,8 +37,6 @@ public interface IncrementalRelation {
boolean fallbackFullTableScan();
- boolean isIncludeStartTime();
-
String getStartTs();
String getEndTs();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
index 69ca39e9ad6..5b16186e10d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -49,7 +49,7 @@ public class MORIncrementalRelation implements
IncrementalRelation {
private final HoodieTimeline timeline;
private final HollowCommitHandling hollowCommitHandling;
private String startTimestamp;
- private final String endTimestamp;
+ private String endTimestamp;
private final boolean startInstantArchived;
private final boolean endInstantArchived;
private final List<HoodieInstant> includedCommits;
@@ -57,7 +57,6 @@ public class MORIncrementalRelation implements
IncrementalRelation {
private final List<StoragePathInfo> affectedFilesInCommits;
private final boolean fullTableScan;
private final String globPattern;
- private final boolean includeStartTime;
private final String startTs;
private final String endTs;
@@ -85,10 +84,14 @@ public class MORIncrementalRelation implements
IncrementalRelation {
if (EARLIEST_TIME.equals(startTimestamp)) {
startTimestamp = "000";
}
- endTimestamp =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
- hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
+
+ String latestTime = hollowCommitHandling ==
HollowCommitHandling.USE_TRANSITION_TIME
? timeline.lastInstant().get().getCompletionTime()
- : timeline.lastInstant().get().requestedTime());
+ : timeline.lastInstant().get().requestedTime();
+ endTimestamp =
optParams.getOrDefault("hoodie.datasource.read.end.instanttime", latestTime);
+ if (LATEST_TIME.equals(latestTime)) {
+ endTimestamp = latestTime;
+ }
startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);
@@ -103,23 +106,14 @@ public class MORIncrementalRelation implements
IncrementalRelation {
}
globPattern =
optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");
- if (startInstantArchived) {
- includeStartTime = false;
- startTs = startTimestamp;
- } else {
- includeStartTime = true;
- startTs = includedCommits.isEmpty() ? startTimestamp :
includedCommits.get(0).requestedTime();
- }
- endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
- : includedCommits.get(includedCommits.size() -
1).requestedTime();
+ startTs = startTimestamp;
+ endTs = endTimestamp;
}
@Override
public Map<String, String> getHoodieParams() {
- optParams.put("hoodie.datasource.read.incr.operation", "true");
optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
optParams.put("hoodie.datasource.read.end.instanttime", endTs);
- optParams.put("hoodie.datasource.read.incr.includeStartTime",
includeStartTime ? "true" : "false");
return optParams;
}
@@ -165,11 +159,6 @@ public class MORIncrementalRelation implements
IncrementalRelation {
return fullTableScan;
}
- @Override
- public boolean isIncludeStartTime() {
- return includeStartTime;
- }
-
@Override
public String getStartTs() {
return startTs;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index e0235e38661..f7ff307f4d8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -33,7 +33,6 @@ import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThan;
-import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -95,7 +94,7 @@ public class LogicalHudiScan extends LogicalFileScan {
/**
* replace incremental params as AND expression
* incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
- * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <=
'20240308110677278'
+ * _hoodie_commit_time > 20240308110257169 and _hoodie_commit_time <=
'20240308110677278'
*/
public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
if (!incrementalRelation.isPresent()) {
@@ -114,9 +113,7 @@ public class LogicalHudiScan extends LogicalFileScan {
StringLiteral upperValue = new
StringLiteral(incrementalRelation.get().getEndTs());
StringLiteral lowerValue = new
StringLiteral(incrementalRelation.get().getStartTs());
ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
- ComparisonPredicate great =
incrementalRelation.get().isIncludeStartTime()
- ? new GreaterThanEqual(timeField, lowerValue)
- : new GreaterThan(timeField, lowerValue);
+ ComparisonPredicate great = new GreaterThan(timeField, lowerValue);
return ImmutableSet.of(great, less);
}
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out
index 50644f34961..df7583fe9ca 100644
Binary files
a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out and
b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out differ
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index b962936dca1..62a34fc04e3 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -39,6 +39,8 @@ suite("test_hudi_incremental",
"p2,external,hudi,external_remote,external_remote
timestamps.eachWithIndex { timestamp, index ->
def query_name = "qt_incremental_${index + 1}_end"
"${query_name}" """ select count(user_id) from
${table_name}@incr('beginTime' = '${timestamp}'); """
+ query_name = "qt_incremental_${index + 1}_latest"
+ "${query_name}" """ select count(user_id) from
${table_name}@incr('beginTime' = '${timestamp}', 'endTime' = 'latest'); """
query_name = "qt_incremental_earliest_${index + 1}"
"${query_name}" """ select count(user_id) from
${table_name}@incr('beginTime' = 'earliest', 'endTime' = '${timestamp}'); """
if (index > 0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]