morrySnow commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3360996655


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -1017,7 +1018,7 @@ private void dropTableInternal(Database db, Table table, 
boolean isView, boolean
             Env.getCurrentEnv().getMtmvService().dropTable(table);
         }
         if (table instanceof BaseTableStream) {
-            
Env.getCurrentEnv().getTableStreamManager().removeTableStream((BaseTableStream) 
table);
+            
Env.getCurrentEnv().getTableStreamManager().removeStaleStream((BaseTableStream) 
table);

Review Comment:
   unprotectDropTable 内已经执行过一次了,为什么这里还要再执行一次?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -4010,6 +4001,9 @@ public void createTableStream(CreateStreamCommand 
command) throws DdlException {
                 } catch (AnalysisException e) {
                     throw new DdlException(e.getMessage(), e);
                 }
+                if (baseTable instanceof OlapTable) {
+                    checkBaseTableAvailableForStreamType((OlapTable) 
baseTable, newStream.getConsumeType());

Review Comment:
   这应该也代理给 TableIf 自己的实现



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseTableStream.java:
##########
@@ -109,15 +123,19 @@ public void setProperties(Map<String, String> properties) 
throws org.apache.dori
         showInitialRows = PropertyAnalyzer.analyzeBooleanProp(properties,
                 PropertyAnalyzer.PROPERTIES_STREAM_SHOW_INITIAL_ROWS,
                 false);
-        streamConsumeType = PropertyAnalyzer.analyzeStreamType(properties);
+        streamScanType = PropertyAnalyzer.analyzeStreamType(properties);
     }
 
     public String getTableStreamType() {
         return "BASE_STREAM";
     }
 
-    public String getConsumeType() {
-        return streamConsumeType.name();
+    public String getConsumeTypeString() {

Review Comment:
   getScanTypeString



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########
@@ -4025,4 +4019,30 @@ public void createTableStream(CreateStreamCommand 
command) throws DdlException {
             LOG.info("successfully create stream[{}]", streamName);
         }
     }
+
+    void checkBaseTableAvailable(TableIf tableIf) throws DdlException {
+        if (!BaseTableStream.isTableTypeSupported(tableIf)) {

Review Comment:
   这里有个问题,我们应该给 TableIf 加个接口,然后通过那个接口判断是否支持 
stream。同时判断是否有合法,也代理给一个接口。这样比较不容易写错代码。
   当前的实现,就疏忽了 Mtmv 也应该支持 stream 的情况。



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/BinlogConfig.java:
##########
@@ -203,6 +203,10 @@ public boolean isEnableForStreaming() {
         return enable && binlogFormat == BinlogFormat.ROW;

Review Comment:
   直接调用 isRowFormat ?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -201,12 +213,57 @@ public void 
unprotectedUpdateStreamUpdate(AbstractTableStreamUpdate update, Long
         for (Map.Entry<Long, Long> entry : next.entrySet()) {
             if (historicalPartitionOffset.containsKey(entry.getKey())) {
                 historicalPartitionOffset.remove(entry.getKey());
-                partitionOffset.put(entry.getKey(), entry.getValue());
+                if (historicalPartitionTSO == null) {
+                    partitionOffset.put(entry.getKey(), entry.getValue());
+                } else {
+                    partitionOffset.put(entry.getKey(), 
historicalPartitionTSO.get(entry.getKey()));
+                    historicalPartitionTSO.remove(entry.getKey());
+                }
             } else {
-                // todo(TsukiokaKogane): update partition offset with tso
                 partitionOffset.put(entry.getKey(), entry.getValue());
             }
             partitionConsumptionTime.put(entry.getKey(), ts);
         }
     }
+
+    Set<Long> unprotectedCollectStalePartitionOffsetIds(Set<Long> 
validPartitionIds) {
+        Preconditions.checkState(isWriteLockHeldByCurrentThread());
+        Set<Long> stalePartitionIds = new HashSet<>();
+        for (Long partitionId : partitionOffset.keySet()) {
+            if (!validPartitionIds.contains(partitionId)) {
+                stalePartitionIds.add(partitionId);
+            }
+        }
+        for (Long partitionId : partitionConsumptionTime.keySet()) {
+            if (!validPartitionIds.contains(partitionId)) {
+                stalePartitionIds.add(partitionId);
+            }
+        }
+        for (Long partitionId : historicalPartitionOffset.keySet()) {
+            if (!validPartitionIds.contains(partitionId)) {
+                stalePartitionIds.add(partitionId);
+            }
+        }
+        if (historicalPartitionTSO != null) {
+            for (Long partitionId : historicalPartitionTSO.keySet()) {
+                if (!validPartitionIds.contains(partitionId)) {
+                    stalePartitionIds.add(partitionId);
+                }
+            }
+        }
+        return stalePartitionIds;
+    }
+
+    int unprotectedPrunePartitionOffsets(Set<Long> partitionIds) {
+        Preconditions.checkState(isWriteLockHeldByCurrentThread());

Review Comment:
   增加 error message



##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -620,6 +624,105 @@ private void traceInstance() {
         }
     }
 
+    private void waitForTimeBasedReadTransactionsVisible() throws Exception {

Review Comment:
   当前默认启用的是 NereidsCoordinator,所以应该同时修改 NereidsCoordinator



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);
+        if (!pruneEntries.isEmpty()) {
+            
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(

Review Comment:
   在进行 log 时,由于没有对db,table进行加锁。所以可能出现在这之前插入 db,表删除的操作日志。进而导致在 follower 上出现内存泄漏。



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java:
##########
@@ -62,41 +63,54 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
 
     private final Optional<TableSnapshot> tableSnapshot;
 
+    // Change scan metadata derived from table@incr(...), only used by Nereids 
path.
+    private final Optional<ChangeScanInfo> changeScanInfo;
+
     public UnboundRelation(RelationId id, List<String> nameParts) {
         this(id, nameParts, Optional.empty(), Optional.empty(),
                 ImmutableList.of(), false, ImmutableList.of(),
                 ImmutableList.of(), Optional.empty(), Optional.empty(), null,
-                Optional.empty(), Optional.empty());
+                Optional.empty(), Optional.empty(), Optional.empty());
     }
 
     public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames,
             boolean isTempPart) {
         this(id, nameParts, Optional.empty(), Optional.empty(), partNames, 
isTempPart, ImmutableList.of(),
-                ImmutableList.of(), Optional.empty(), Optional.empty(), null, 
Optional.empty(), Optional.empty());
+                ImmutableList.of(), Optional.empty(), Optional.empty(), null, 
Optional.empty(), Optional.empty(),
+                Optional.empty());
     }
 
     public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames,
             boolean isTempPart, List<Long> tabletIds, List<String> hints, 
Optional<TableSample> tableSample,
             Optional<String> indexName) {
         this(id, nameParts, Optional.empty(), Optional.empty(),
                 partNames, isTempPart, tabletIds, hints, tableSample, 
indexName, null, Optional.empty(),
-                Optional.empty());
+                Optional.empty(), Optional.empty());
     }
 
     public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames,
             boolean isTempPart, List<Long> tabletIds, List<String> hints, 
Optional<TableSample> tableSample,
             Optional<String> indexName, TableScanParams scanParams, 
Optional<TableSnapshot> tableSnapshot) {
+        this(id, nameParts, isTempPart, partNames, tabletIds, hints, 
tableSample, indexName,
+                scanParams, tableSnapshot, Optional.empty());
+    }
+
+    public UnboundRelation(RelationId id, List<String> nameParts, boolean 
isTempPart, List<String> partNames,
+            List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName,

Review Comment:
   这个新的构造函数,应当保持和其他构造函数在共有参数上的顺序一致性。也就是说 boolean isTempPart, List<String> 
partNames 应当调整顺序。



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);

Review Comment:
   这个不需要回放吗?如果不回访,master 和 follower 会保持一致吗?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableStreamScan.java:
##########
@@ -144,14 +159,16 @@ public List<Slot> computeOutput() {
                 }
             }
         }
-        if (!isIncrementalScan) {
-            // inject virtual stream hidden columns
-            SlotReference seqColRef = (SlotReference) new Alias(new 
BigIntLiteral(-1L), Column.STREAM_SEQ_COL).toSlot();
+        if (!changeScanInfo.isPresent()) {
+            // Only stream queries expose stream virtual columns.
+            SlotReference seqColRef = (SlotReference) new Alias(new 
BigIntLiteral(-1L), Column.STREAM_SEQ_COL)
+                    .toSlot().withNullable(true);

Review Comment:
   这里先创建一个 alias 然后就转成 slot 的目的是什么?直接创建 SlotReference 不可以吗?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +253,13 @@ private LogicalPlan makeOlapScan(TableIf table, 
UnboundRelation unboundRelation,
                     CollectionUtils.isEmpty(partIds) ? ((OlapTable) 
table).getPartitionIds() : partIds, indexId,
                     preAggStatus, CollectionUtils.isEmpty(partIds) ? 
ImmutableList.of() : partIds,
                     unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of());
+            } else if (isChangeRead(unboundRelation)) {

Review Comment:
   如果同时指定 index id 和 @incr,会静默的吞掉 @incr,是否应该直接报错?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableStreamScan.java:
##########
@@ -179,28 +197,29 @@ public LogicalOlapTableStreamScan 
withCachedOutput(List<Slot> outputSlots) {
         return AbstractPlan.copyWithSameId(this, () ->
                 new LogicalOlapTableStreamScan(relationId, (Table) table, 
qualifier,
                         groupExpression, Optional.empty(),
-                        selectedPartitionIds, partitionPruned, 
selectedTabletIds,
+                        selectedPartitionIds, partitionPruned, 
hasPartitionPredicate, selectedTabletIds,
                         selectedIndexId, indexSelected, preAggStatus, 
manuallySpecifiedPartitions, hints,
                         cacheSlotWithSlotName, Optional.of(outputSlots), 
tableSample, directMvScan, colToSubPathsMap,
                         manuallySpecifiedTabletIds, operativeSlots, 
virtualColumns, scoreOrderKeys, scoreLimit,
-                        scoreRangeInfo, annOrderKeys, annLimit, tableAlias, 
isIncrementalScan));
+                        scoreRangeInfo, annOrderKeys, annLimit, tableAlias, 
partitionPrunablePredicates,
+                        changeScanInfo, isNormalized, isIncrementalScan));
     }
 
     @Override
     public LogicalOlapTableStreamScan withOperativeSlots(Collection<Slot> 
operativeSlots) {
         return AbstractPlan.copyWithSameId(this, () ->
                 new LogicalOlapTableStreamScan(relationId, (Table) table, 
qualifier,
                         groupExpression, Optional.of(getLogicalProperties()),
-                        selectedPartitionIds, partitionPruned, 
selectedTabletIds,
+                        selectedPartitionIds, partitionPruned, 
hasPartitionPredicate, selectedTabletIds,
                         selectedIndexId, indexSelected, preAggStatus, 
manuallySpecifiedPartitions,
                         hints, cacheSlotWithSlotName, cachedOutput, 
tableSample, directMvScan, colToSubPathsMap,
                         manuallySpecifiedTabletIds, operativeSlots, 
virtualColumns, scoreOrderKeys, scoreLimit,
-                        scoreRangeInfo, annOrderKeys, annLimit, tableAlias, 
isIncrementalScan));
+                        scoreRangeInfo, annOrderKeys, annLimit, tableAlias, 
partitionPrunablePredicates,
+                        changeScanInfo, isNormalized, isIncrementalScan));
     }
 
     @Override
     public List<Slot> getOutputByIndex(long indexId) {

Review Comment:
   stream 上也能创建同步物化视图?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableBinlogScan.java:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.trees.ChangeScanInfo;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableStreamScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Normalize CHANGES semantic binlog scans without touching real stream scan 
behavior.
+ */
+public class NormalizeOlapTableBinlogScan implements CustomRewriter {

Review Comment:
   为什么不能使用 pattern match 的 rewriter?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -895,12 +897,19 @@ private static List<RewriteJob> getWholeTreeRewriteJobs(
                 ImmutableSet.of(LogicalCTEAnchor.class),
                 () -> {
                     List<RewriteJob> rewriteJobs = 
Lists.newArrayListWithExpectedSize(300);
-                    rewriteJobs.add(
-                            topic("normalize olap table stream scan",
-                                    
custom(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN,
-                                            NormalizeOlapTableStreamScan::new)
-                            )
-                    );
+                    if (Config.enable_table_stream) {

Review Comment:
   这里不应该受 enable_table_stream 的控制。如果说这个开关需要影响是否能够查询 stream 表以及,是否能够查询 `@incr` 
那么应该在 bindRelation 时处理



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########


Review Comment:
   这里不能直接check。否则一旦返回的基表类型不对,依赖此接口的所有功能都会失败。这里应返回 null



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -48,22 +60,52 @@
  * 2. add delete sign column if unique base table
  */
 public class NormalizeOlapTableStreamScan implements CustomRewriter {
+    private static final long ROW_BINLOG_APPEND = 0;
+    private static final long ROW_BINLOG_DELETE = 1;
+    private static final long ROW_BINLOG_UPDATE_BEFORE = 2;
+    private static final long ROW_BINLOG_UPDATE_AFTER = 3;
 
     @Override
     public Plan rewriteRoot(Plan plan, JobContext jobContext) {
         return plan.accept(OlapTableStreamScanReplacer.INSTANCE, null);
     }
 
+    private static Expression buildChangeTypeExpr(Slot opSlot) {

Review Comment:
   同样可以在 bind relation 时直接生成,可以参考:preAggForRandomDistribution 以及 
checkAndAddDeleteSignFilter



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanTablet.java:
##########
@@ -79,6 +79,11 @@ public Rule build() {
                 for (Long id : olapScan.getSelectedPartitionIds()) {
                     Partition partition = table.getPartition(id);
                     MaterializedIndex index = 
partition.getIndex(olapScan.getSelectedIndexId());
+                    if (index == null && table.needRowBinlog()
+                            && olapScan.getSelectedIndexId() == 
table.getBaseIndexMeta().getRowBinlogIndexId()) {
+                        // if row binlog index is selected, then use base index
+                        index = table.getBaseIndex();
+                    }

Review Comment:
   这是用来直查物化视图的,所以,我觉得这里直接报错就可以了



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +253,13 @@ private LogicalPlan makeOlapScan(TableIf table, 
UnboundRelation unboundRelation,
                     CollectionUtils.isEmpty(partIds) ? ((OlapTable) 
table).getPartitionIds() : partIds, indexId,
                     preAggStatus, CollectionUtils.isEmpty(partIds) ? 
ImmutableList.of() : partIds,
                     unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of());
+            } else if (isChangeRead(unboundRelation)) {
+                OlapTable olapTable = (OlapTable) table;
+                RowBinlogTableWrapper wrapper = new 
RowBinlogTableWrapper(olapTable);
+                ChangeScanInfo changeScanInfo = buildChangeScanInfo(olapTable, 
unboundRelation.getScanParams());
+                unboundRelation = 
unboundRelation.withChangeScanInfo(changeScanInfo);
+                Preconditions.checkState(changeScanInfo != null);
+                scan = makeOlapTableStreamScan(wrapper, unboundRelation, 
qualifier);

Review Comment:
   一个额外的问题,查询 stream 和查询 binlog 本质应该是相同的,为什么在查询的时候是两条不同的路径?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -987,6 +1005,26 @@ private PlanFragment 
computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran
         return planFragment;
     }
 
+    private void applyChangeScanInfo(OlapScanNode olapScanNode, ChangeScanInfo 
changeScanInfo) {

Review Comment:
   change scan info 和 stream 这两个信息是否可以抽象成一个,这样链路就是统一的了



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java:
##########


Review Comment:
   addTableStream 放在 createTableWithoutLock 中更合适。这样就不用在这里和回放的路径上分别做这个动作



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);
+        if (!pruneEntries.isEmpty()) {
+            
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(
+                    new PruneTableStreamPartitionOffsetInfo(pruneEntries));
+        }
+    }
+
+    private Optional<PruneTableStreamPartitionOffsetInfo.Entry> 
cleanupStalePartitionOffsets(OlapTableStream stream) {
+        if (stream.isDisabled() || stream.isStale()) {
+            return Optional.empty();
+        }
+        OlapTable baseTable = stream.getBaseTableNullable();
+        if (baseTable == null) {
+            return Optional.empty();
+        }
+        Set<Long> validPartitionIds;
+        if (!baseTable.tryReadLock(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip cleaning stream {} because base table {} read 
lock is busy",
+                        stream.getName(), baseTable.getName());
+            }
+            return Optional.empty();
+        }
+        try {
+            if (baseTable.isDropped) {
+                return Optional.empty();
+            }
+            validPartitionIds = new HashSet<>(baseTable.getPartitionIds());
+        } finally {
+            baseTable.readUnlock();
+        }
+        if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip cleaning stream {} because stream write lock 
is busy", stream.getName());
+            }
+            return Optional.empty();
+        }
+        try {
+            if (stream.isDisabled() || stream.isStale()) {
+                return Optional.empty();
+            }
+            Set<Long> stalePartitionIds = 
stream.unprotectedCollectStalePartitionOffsetIds(validPartitionIds);
+            if (stalePartitionIds.isEmpty()) {
+                return Optional.empty();
+            }
+            int removedPartitionCount = 
stream.unprotectedPrunePartitionOffsets(stalePartitionIds);
+            if (removedPartitionCount > 0) {

Review Comment:
   这里的逻辑比较奇怪,为什么不直接用 stalePartitionIds.size()。
   如果这里是考虑到可能 unprotectedPrunePartitionOffsets 不会处理 stalePartitionIds 中的全部 
partition,那么下面记录 PruneTableStreamPartitionOffsetInfo 时,则不应使用 
stalePartitionIds,否则可能出现回放不一致问题?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);
+        if (!pruneEntries.isEmpty()) {
+            
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(
+                    new PruneTableStreamPartitionOffsetInfo(pruneEntries));
+        }
+    }
+
+    private Optional<PruneTableStreamPartitionOffsetInfo.Entry> 
cleanupStalePartitionOffsets(OlapTableStream stream) {
+        if (stream.isDisabled() || stream.isStale()) {
+            return Optional.empty();
+        }
+        OlapTable baseTable = stream.getBaseTableNullable();
+        if (baseTable == null) {
+            return Optional.empty();
+        }
+        Set<Long> validPartitionIds;
+        if (!baseTable.tryReadLock(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip cleaning stream {} because base table {} read 
lock is busy",
+                        stream.getName(), baseTable.getName());
+            }
+            return Optional.empty();
+        }
+        try {
+            if (baseTable.isDropped) {
+                return Optional.empty();
+            }
+            validPartitionIds = new HashSet<>(baseTable.getPartitionIds());

Review Comment:
   这里是否需要考虑临时分区?当临时分区写入时,是否会生成 binlog?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -201,12 +213,57 @@ public void 
unprotectedUpdateStreamUpdate(AbstractTableStreamUpdate update, Long
         for (Map.Entry<Long, Long> entry : next.entrySet()) {
             if (historicalPartitionOffset.containsKey(entry.getKey())) {
                 historicalPartitionOffset.remove(entry.getKey());
-                partitionOffset.put(entry.getKey(), entry.getValue());
+                if (historicalPartitionTSO == null) {
+                    partitionOffset.put(entry.getKey(), entry.getValue());
+                } else {
+                    partitionOffset.put(entry.getKey(), 
historicalPartitionTSO.get(entry.getKey()));
+                    historicalPartitionTSO.remove(entry.getKey());
+                }
             } else {
-                // todo(TsukiokaKogane): update partition offset with tso
                 partitionOffset.put(entry.getKey(), entry.getValue());
             }
             partitionConsumptionTime.put(entry.getKey(), ts);
         }
     }
+
+    Set<Long> unprotectedCollectStalePartitionOffsetIds(Set<Long> 
validPartitionIds) {
+        Preconditions.checkState(isWriteLockHeldByCurrentThread());

Review Comment:
   增加 error message



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableStreamScan.java:
##########
@@ -105,29 +118,31 @@ public LogicalOlapTableStreamScan 
withManuallySpecifiedTabletIds(List<Long> manu
         return AbstractPlan.copyWithSameId(this, () ->
                 new LogicalOlapTableStreamScan(relationId, (Table) table, 
qualifier,
                         Optional.empty(), Optional.of(getLogicalProperties()),
-                        selectedPartitionIds, partitionPruned, 
selectedTabletIds,
+                        selectedPartitionIds, partitionPruned, 
hasPartitionPredicate, selectedTabletIds,
                         selectedIndexId, indexSelected, preAggStatus, 
manuallySpecifiedPartitions,
                         hints, cacheSlotWithSlotName, cachedOutput, 
tableSample, directMvScan,
                         colToSubPathsMap, manuallySpecifiedTabletIds, 
operativeSlots, virtualColumns,
                         scoreOrderKeys, scoreLimit, scoreRangeInfo, 
annOrderKeys, annLimit, tableAlias,
-                        isIncrementalScan));
+                        partitionPrunablePredicates, changeScanInfo, 
isNormalized, isIncrementalScan));
     }
 
     @Override
     public List<Slot> computeOutput() {
         if (cachedOutput.isPresent()) {
             return cachedOutput.get();
         }
-        // we need to create slots vectorized for stream scan, no need for 
invisible column
-        // todo(TsukiokaKogane): support compute binlog-based schema
-        List<Column> baseSchema = table.getBaseSchema(false);
+        List<Column> baseSchema = 
table.getBaseSchema(changeScanInfo.isPresent());
         List<SlotReference> slotFromColumn = createSlotsVectorized(baseSchema);
 
         ImmutableList.Builder<Slot> slots = ImmutableList.builder();
         IdGenerator<ExprId> exprIdGenerator = 
StatementScopeIdGenerator.getExprIdGenerator();
         for (int i = 0; i < baseSchema.size(); i++) {
+            // skip binlog before column
             final int index = i;
             Column col = baseSchema.get(i);
+            if (col.getName().startsWith(Column.BINLOG_BEFORE_PREFIX)) {

Review Comment:
   这里的 BINLOG_BEFORE_PREFIX 需要更改命名,所有的内置列都应以 `__DORIS_` 开头



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +253,13 @@ private LogicalPlan makeOlapScan(TableIf table, 
UnboundRelation unboundRelation,
                     CollectionUtils.isEmpty(partIds) ? ((OlapTable) 
table).getPartitionIds() : partIds, indexId,
                     preAggStatus, CollectionUtils.isEmpty(partIds) ? 
ImmutableList.of() : partIds,
                     unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of());
+            } else if (isChangeRead(unboundRelation)) {
+                OlapTable olapTable = (OlapTable) table;
+                RowBinlogTableWrapper wrapper = new 
RowBinlogTableWrapper(olapTable);

Review Comment:
   OlapTableWrapper 这个类是不是应该改成抽象类?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RowBinlogTableWrapper.java:
##########
@@ -25,6 +27,7 @@
 public class RowBinlogTableWrapper extends OlapTableWrapper {
 
     private final MaterializedIndexMeta rowBinlogMeta;
+    private OlapTableStreamWrapper parent;

Review Comment:
   final



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RowBinlogTableWrapper.java:
##########
@@ -33,6 +36,14 @@ public RowBinlogTableWrapper(OlapTable originTable) {
         this.setBaseIndexId(rowBinlogMeta.getIndexId());
     }
 
+    public RowBinlogTableWrapper(OlapTable originTable, OlapTableStreamWrapper 
parent) {
+        super(originTable, originTable.getName(), 
originTable.getRowBinlogMeta().getSchema(), KeysType.DUP_KEYS);

Review Comment:
   如果 binlog 直接放入到 indexIdToMeta,对同步物化视图改写是否会产生影响?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +253,13 @@ private LogicalPlan makeOlapScan(TableIf table, 
UnboundRelation unboundRelation,
                     CollectionUtils.isEmpty(partIds) ? ((OlapTable) 
table).getPartitionIds() : partIds, indexId,
                     preAggStatus, CollectionUtils.isEmpty(partIds) ? 
ImmutableList.of() : partIds,
                     unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of());
+            } else if (isChangeRead(unboundRelation)) {
+                OlapTable olapTable = (OlapTable) table;
+                RowBinlogTableWrapper wrapper = new 
RowBinlogTableWrapper(olapTable);
+                ChangeScanInfo changeScanInfo = buildChangeScanInfo(olapTable, 
unboundRelation.getScanParams());
+                unboundRelation = 
unboundRelation.withChangeScanInfo(changeScanInfo);
+                Preconditions.checkState(changeScanInfo != null);

Review Comment:
   移除这个check,或者增加error message



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableBinlogScan.java:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.trees.ChangeScanInfo;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableStreamScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Normalize CHANGES semantic binlog scans without touching real stream scan 
behavior.
+ */
+public class NormalizeOlapTableBinlogScan implements CustomRewriter {
+    private static final long ROW_BINLOG_APPEND = 0L;
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        return plan.accept(BinlogScanReplacer.INSTANCE, null);
+    }
+
+    private static class BinlogScanReplacer extends DefaultPlanRewriter<Void> {
+        private static final BinlogScanReplacer INSTANCE = new 
BinlogScanReplacer();
+
+        @Override
+        public Plan visitLogicalOlapTableStreamScan(LogicalOlapTableStreamScan 
scan, Void context) {
+            if (scan.isNormalized() || !scan.getChangeScanInfo().isPresent()) {
+                return scan;
+            }
+            ChangeScanInfo.InformationKind informationKind = 
scan.getChangeScanInfo().get().getInformationKind();

Review Comment:
   为什么不直接在bind relation时处理?如果在 bind relation 时处理,看起来也不需要 
LogicalOlapTableStreamScan 这个类



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -888,12 +896,12 @@ private PlanFragment 
computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran
         }
 
         // generate base index tuple because this fragment partitioned expr 
relay on slots of based index
-        if (olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {
+        if (!olapScan.isIncrementalScan() && olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {

Review Comment:
   这里抽象一个函数在olap scan 上,说明这是在查询同步物化视图



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -872,6 +876,10 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan 
olapScan, PlanTransla
     private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, 
PlanTranslatorContext context) {
         List<Slot> slots = olapScan.getOutput();
         OlapTable olapTable = olapScan.getTable();
+        if (olapScan.isIncrementalScan() && 
!olapScan.getChangeScanInfo().isPresent()) {

Review Comment:
   如果这里最终转为了RowBinlogTableWrapper为什么不一开始就生成 RowBinlogTableWrapper ?
   这里的 `if (olapScan.isIncrementalScan() && 
!olapScan.getChangeScanInfo().isPresent()) ` 为什么就能保证 `olapTable` 就是 
`OlapTableStreamWrapper`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to