yujun777 commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3361319946
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -169,21 +186,16 @@ public boolean hasData(Partition partition) {
// if all available visible data has been consumed, return false
// todo(TsukiokaKogane): change offset from partition version to
commit tso
return (!partitionOffset.containsKey(partition.getId())
- ||
!partitionOffset.get(partition.getId()).equals(partition.getVisibleVersion()))
+ ||
!partitionOffset.get(partition.getId()).equals(partition.getTso()))
&& partition.hasData();
}
+ public boolean hasHistoricalData(long partitionId) {
+ return historicalPartitionOffset.containsKey(partitionId);
+ }
+
public Pair<Long, Long> getStreamUpdate(Long partitionId) {
- Long next = null;
- Long prev = null;
- if (historicalPartitionOffset.containsKey(partitionId)) {
- next = historicalPartitionOffset.get(partitionId);
- } else {
- // todo(TsukiokaKogane): update next version with stepping
- next = ((OlapTable)
baseTable).getPartition(partitionId).getVisibleVersion();
- }
- prev = partitionOffset.get(partitionId);
- return Pair.of(prev, next);
+ return Pair.of(partitionOffset.get(partitionId),
historicalPartitionOffset.get(partitionId));
Review Comment:
For incremental stream partitions this returns `next = null`, so
`OlapScanNode.addScanRangeLocations()` only sends `start_tso` and does not send
an `end_tso`. But `OlapScanNode.getStreamUpdate()` later records `next` as the
partition current TSO. Those two values are not frozen together: the BE can
scan an open-ended range while the transaction commits a FE-side `next` offset
captured at a different moment.
For stream offset correctness, the scan range and the committed update
should use the same `[prev, next]` snapshot. I think incremental stream scan
should also capture the current partition TSO here as `next`, and the scan
range should send it as `end_tso` before the insert transaction records that
same value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]