TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3510308228


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -63,44 +74,273 @@
  * 2. add delete sign column if unique base table
  */
 public class NormalizeOlapTableStreamScan extends OneRewriteRuleFactory {
-    private static final long ROW_BINLOG_APPEND = 0;
-    private static final long ROW_BINLOG_DELETE = 1;
-    private static final long ROW_BINLOG_UPDATE_BEFORE = 2;
-    private static final long ROW_BINLOG_UPDATE_AFTER = 3;
-
     @Override
     public Rule build() {
         return logicalOlapTableStreamScan()
-                .when(scan -> !scan.isNormalized())
-                .then(this::normalize)
+                .thenApply(ctx -> normalize(ctx.root, ctx.cascadesContext))
                 .toRule(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN);
     }
 
     private static Expression buildChangeTypeExpr(Slot opSlot) {
         return new CaseWhen(ImmutableList.of(
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_APPEND)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND)),
                         new VarcharLiteral("APPEND")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_DELETE)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE)),
                         new VarcharLiteral("DELETE")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_UPDATE_BEFORE)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)),
                         new VarcharLiteral("UPDATE_BEFORE")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_UPDATE_AFTER)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_AFTER)),
                         new VarcharLiteral("UPDATE_AFTER"))), new 
VarcharLiteral("UNKNOWN"));
     }
 
-    private Plan normalize(LogicalOlapTableStreamScan scan) {
-        List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
-        if (selectedPartitionIds.isEmpty()) {
+    private Plan normalize(LogicalOlapTableStreamScan scan, CascadesContext 
cascadesContext) {
+        if (scan.getSelectedPartitionIds().isEmpty()) {
             return scan;
         }
-        List<Long> historicalPartitionIds = 
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
-                .filterHistoryPartitionIds(selectedPartitionIds));
-        List<Long> incrementalPartitionIds = 
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
-                .filterIncrementalPartitionIds(selectedPartitionIds));
+        if (scan.isReset()) {
+            return makeResetOlapFullScan(scan, cascadesContext);
+        }
+        if (scan.isSnapshot()) {
+            return makeSnapshotScan(scan, cascadesContext);
+        }
+        return makeTableStreamScan(scan, cascadesContext);
+    }
+
+    /**
+     * Build a projection list that exposes the columns of {@code wantedSlots} 
(slots taken from the
+     * original stream scan output) on top of the rewritten child whose output 
is {@code childOutput}.
+     */
+    private List<NamedExpression> mapOriginOutputFromChild(List<Slot> 
wantedSlots, List<Slot> childOutput,
+                                                           boolean 
useOriginalExprIds) {
+        Map<String, Slot> childSlotByName = new HashMap<>();
+        for (Slot slot : childOutput) {
+            childSlotByName.put(slot.getName(), slot);
+        }
+        List<NamedExpression> project = new ArrayList<>(wantedSlots.size());
+        for (Slot wanted : wantedSlots) {
+            Slot match = childSlotByName.get(wanted.getName());
+            Preconditions.checkArgument(match != null,
+                    "column %s not found in child output", wanted.getName());
+            if (useOriginalExprIds) {
+                project.add(new Alias(wanted.getExprId(), match, 
wanted.getName()));
+            } else {
+                project.add(new Alias(match, wanted.getName()));
+            }
+        }
+        return project;
+    }
+
+    // project from origin output slots to child slots with new expr ids
+    private Plan projectFromOriginSlots(Plan plan, List<Slot> originSlots) {
+        return new LogicalProject<>(mapOriginOutputFromChild(originSlots, 
plan.getOutput(), false), plan);
+    }
+
+    // project to origin output slots with original expr ids
+    private Plan projectToOriginSlots(Plan plan, List<Slot> originSlots) {
+        return new LogicalProject<>(mapOriginOutputFromChild(originSlots, 
plan.getOutput(), true), plan);
+    }
+
+    /**
+     * Build an incremental scan reading row-level changes from base table 
binlog for the given
+     * partitions and their {@code offsetMap} ((startTso, endTso) per 
partition).
+     *
+     * <p>The binlog scan is wrapped by {@link RowBinlogTableWrapper} and 
marked as INCREMENTAL_READ.
+     * {@code streamScanType} decides which binlog rows are emitted:
+     * APPEND_ONLY keeps only APPEND rows; MIN_DELTA/DETAIL keep the raw 
change rows.
+     *
+     * <p>{@code isIncremental} distinguishes the two callers:
+     * <ul>
+     *   <li>true  — normal stream consumption: map the binlog op/timestamp 
columns into the stream
+     *               virtual columns STREAM_CHANGE_TYPE_COL / 
STREAM_SEQ_COL.</li>
+     *   <li>false — snapshot rebuild: only keep DELETE &amp; UPDATE_BEFORE 
rows so they can be added
+     *               back to reconstruct the "before" image at the snapshot 
point.</li>
+     * </ul>
+     */
+    private Plan makeIncrementalScanFromBinlog(CascadesContext 
cascadesContext, LogicalOlapTableStreamScan scan,
+                                               List<Long> selectedPartitionIds,
+                                               OlapTable baseTable, Map<Long, 
Pair<Long, Long>> offsetMap,
+                                               BaseTableStream.StreamScanType 
streamScanType, List<Slot> originSlots,
+                                               List<Slot> notVirtualSlots, 
boolean isIncremental) {
+        // remap scan from binlog
+        RowBinlogTableWrapper table =
+                new RowBinlogTableWrapper(baseTable, offsetMap);
+        Map<String, String> scanParams = new HashMap<>();
+        scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE, 
streamScanType.toString());
+        LogicalOlapScan newScan = new 
LogicalOlapScan(cascadesContext.getStatementContext().getNextRelationId(),
+                table, scan.qualified(), selectedPartitionIds, 
scan.getSelectedTabletIds(),
+                new ArrayList<>(), scan.getTableSample(), ImmutableList.of(),
+                Optional.of(new 
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams, 
Lists.newArrayList())));
+        Plan plan = newScan;
+        List<Slot> binlogOutputSlots = newScan.getOutput();
+        // project stream virtual slot from binlog
+        Slot opSlot = null;
+        Slot seqSlot = null;
+        for (int i = 0; i < binlogOutputSlots.size(); i++) {
+            if 
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
+                seqSlot = binlogOutputSlots.get(i);
+            } else if 
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_OPERATION_COL)) {
+                opSlot = binlogOutputSlots.get(i);
+            }
+        }
+        if (streamScanType.equals(BaseTableStream.StreamScanType.APPEND_ONLY)) 
{
+            // filter append-only operation if needed
+            Preconditions.checkArgument(opSlot != null);
+            plan = new LogicalFilter<>(ImmutableSet.of(new EqualTo(opSlot,
+                    new BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND))), plan);
+        }
+        List<NamedExpression> project = 
mapOriginOutputFromChild(notVirtualSlots, binlogOutputSlots, false);
+        if (isIncremental) {
+            // replace stream virtual column with alias slot reference
+            for (Slot slot : originSlots) {
+                if (slot instanceof SlotReference
+                        && ((SlotReference) 
slot).getOriginalColumn().isPresent()
+                        && ((SlotReference) slot).getOriginalColumn().get()
+                        .equals(Column.STREAM_CHANGE_TYPE_VIRTUAL_COLUMN)) {
+                    project.add(new 
Alias(StatementScopeIdGenerator.newExprId(), buildChangeTypeExpr(opSlot),
+                            Column.STREAM_CHANGE_TYPE_COL));
+                } else if (slot instanceof SlotReference
+                        && ((SlotReference) 
slot).getOriginalColumn().isPresent()
+                        && ((SlotReference) slot).getOriginalColumn().get()
+                        .equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)) {
+                    project.add(new 
Alias(StatementScopeIdGenerator.newExprId(), seqSlot, Column.STREAM_SEQ_COL));
+                }
+            }
+        } else {
+            // only filter delete & update before rows for building before 
snapshot image
+            Preconditions.checkArgument(opSlot != null);
+            Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+                    new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+                    new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+            plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+        }
+        return new LogicalProject<>(project, plan);
+    }
+
+    /**
+     * Reset mode: the stream is (re)initialized to a full snapshot of the 
base table, so we simply
+     * do a full olap scan over the base table (with full schema) and project 
back to the origin
+     * stream output slots. No binlog / virtual columns are involved.
+     */
+    private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan, 
CascadesContext cascadesContext) {
+        // make olap scan on base table
+        OlapTableStreamWrapper streamWrapper = scan.getTable();
+        OlapTable baseTable = streamWrapper.getBaseTable();
+        Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable, 
scan.getSelectedPartitionIds());
+        List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+        return projectToOriginSlots(plan, originSlots);
+    }
+
+    /**
+     * Snapshot mode: read a consistent snapshot of the stream at its 
consumption point.
+     *
+     * <p>For DUP_KEYS tables the snapshot can be rebuilt directly from the 
base table using the
+     * historical partition offsets.
+     *
+     * <p>For unique/agg tables partitions are split into two groups:
+     * <ul>
+     *   <li>normal partitions — no new data after the snapshot point, scanned 
from base table as-is;</li>
+     *   <li>rebuild partitions — have newer data after the snapshot point, so 
the snapshot image is
+     *       reconstructed by scanning the base table (rows with commit tso 
&lt;= consumption tso) and
+     *       adding back the DELETE / UPDATE_BEFORE rows from binlog.</li>
+     * </ul>
+     * The two parts are unioned and projected back to the origin output slots.
+     */
+    private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan, 
CascadesContext cascadesContext) {
+        List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+        OlapTableStreamWrapper streamWrapper = scan.getTable();
+        OlapTable baseTable = streamWrapper.getBaseTable();
+        List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+        selectedPartitionIds = 
streamWrapper.filterConsumedPartitionIds(selectedPartitionIds);
+        if (baseTable.getKeysType().equals(KeysType.DUP_KEYS)) {
+            // dup key table can just rebuild from base table
+            Map<Long, Pair<Long, Long>> partitionOffsetMap =
+                    
streamWrapper.getHistoryPartitionOffsets(selectedPartitionIds);
+            OlapTableWrapper table =
+                    new OlapTableWrapper(baseTable, partitionOffsetMap);
+            return projectToOriginSlots(makeOlapScanOnBaseTable(scan, 
cascadesContext, table, selectedPartitionIds),
+                    originSlots);
+        }
+        // normal partition has no new data after snapshot, scan base table 
directly
+        List<Long> normalPartitionIds = 
streamWrapper.filterNormalSnapshotPartitionIds(selectedPartitionIds);
+        Set<Long> normalPartitionIdSet = 
ImmutableSet.copyOf(normalPartitionIds);
+        // rebuild partition has new data after snapshot, need to rebuild
+        List<Long> rebuildPartitionIds =
+                selectedPartitionIds.stream()
+                        .filter(id -> 
!normalPartitionIdSet.contains(id)).collect(ImmutableList.toImmutableList());
+        Plan normalPlan = null;
+        Plan rebuildPlan = null;
+        if (!normalPartitionIds.isEmpty()) {
+            normalPlan = makeOlapScanOnBaseTable(scan, cascadesContext, 
baseTable, normalPartitionIds);
+        }
+        if (!rebuildPartitionIds.isEmpty()) {
+            // base table scan part
+            // build base table offset
+            // for row commit tso <= consumption tso we scan from base table
+            Map<Long, Pair<Long, Long>> partitionOffsetMap =
+                    
streamWrapper.getHistoryPartitionOffsets(rebuildPartitionIds);
+            OlapTableWrapper table =
+                    new OlapTableWrapper(baseTable, partitionOffsetMap);
+            Plan basePartPlan = makeOlapScanOnBaseTable(scan, cascadesContext, 
table, rebuildPartitionIds);
+            // we rebuild by add back updated & deleted rows from binlog
+            Plan binlogPartPlan = 
makeIncrementalScanFromBinlog(cascadesContext, scan, rebuildPartitionIds,
+                    baseTable, 
streamWrapper.getPartitionOffsets(rebuildPartitionIds),
+                    BaseTableStream.StreamScanType.MIN_DELTA, originSlots, 
originSlots, false);
+            rebuildPlan = combineTwoPlan(basePartPlan, binlogPartPlan, 
originSlots);
+        }
+        return combineTwoPlan(normalPlan, rebuildPlan, originSlots);

Review Comment:
   get 我漏了有一个中间结果也是combine的



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to