TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3505415879


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -189,37 +183,40 @@ public boolean hasData(Partition partition) {
     }
 
     public boolean hasHistoricalData(long partitionId) {
-        return historicalPartitionOffset.containsKey(partitionId);
+        return historicalPartitionTSO.containsKey(partitionId);
+    }
+
+    public boolean hasConsumedData(long partitionId) {
+        return partitionOffset.containsKey(partitionId);
     }
 
     public Pair<Long, Long> getStreamUpdate(Long partitionId) {
-        return Pair.of(partitionOffset.get(partitionId), 
historicalPartitionOffset.get(partitionId));
+        // if partition has historical data, return <historical tso, current 
tso>
+        // otherwise, return <current tso, current tso>
+        Long left = partitionOffset.get(partitionId);
+        if (historicalPartitionTSO.containsKey(partitionId)) {
+            left = historicalPartitionTSO.get(partitionId);
+        }
+        return Pair.of(left, 
getBaseTableNullable().getPartition(partitionId).getTso());

Review Comment:
   partitionOffset.get(partitionId)  拿的是消费位点 
   getBaseTableNullable().getPartition(partitionId).getTso() 拿的是分区的最新位点



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to