TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3505578957


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -151,42 +168,102 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
                         && ((SlotReference) 
slot).getOriginalColumn().isPresent()
                         && ((SlotReference) slot).getOriginalColumn().get()
                         .equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)) {
-                    project.add(new Alias(slot.getExprId(), new Nullable(new 
BigIntLiteral(-1)),
-                            Column.STREAM_SEQ_COL));
+                    project.add(new Alias(slot.getExprId(), tsoSlot, 
Column.STREAM_SEQ_COL));
                 }
             }
             historyPlan = new LogicalProject<>(project, plan);
         }
 
         // incremental plan
         if (!incrementalPartitionIds.isEmpty()) {
-            List<Slot> scanSlots = new ArrayList<>(newSlots);
-            // add slot from binlog
-            Slot opSlot = null;
-            Slot seqSlot = null;
-            for (Column column : ((OlapTableStreamWrapper) 
scan.getTable()).getRowBinlogSchema()) {
-                if (column.getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
-                    seqSlot = 
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
-                            column, scan.qualified());
-                    scanSlots.add(seqSlot);
-                } else if 
(column.getName().equals(Column.BINLOG_OPERATION_COL)) {
-                    opSlot = 
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
-                            column, scan.qualified());
-                    scanSlots.add(opSlot);
-                }
+            // remap scan from binlog
+            incrementalPlan = makeIncrementalScanFromBinlog(cascadesContext, 
scan, incrementalPartitionIds,
+                    baseTable, 
streamWrapper.getPartitionOffsets(incrementalPartitionIds),
+                    streamWrapper.getStreamScanType(), originSlots, newSlots, 
true);
+        }
+
+        return combineTwoPlan(historyPlan, incrementalPlan, originSlots);
+    }
+
+    private Plan refreshUnionChildOutputExprIds(Plan plan, List<Slot> 
unionOutputs) {
+        Preconditions.checkState(plan.getOutput().size() == 
unionOutputs.size(),
+                "Union child output size %s does not match union output size 
%s",
+                plan.getOutput().size(), unionOutputs.size());
+        List<NamedExpression> project = new 
ArrayList<>(plan.getOutput().size());
+        for (int i = 0; i < plan.getOutput().size(); i++) {
+            project.add(new Alias(plan.getOutput().get(i), 
unionOutputs.get(i).getName()));
+        }
+        return new LogicalProject<>(project, plan);
+    }
+
+    /**
+     * Build a projection list that exposes the columns of {@code wantedSlots} 
(slots taken from the
+     * original stream scan output) on top of the rewritten child whose output 
is {@code childOutput}.
+     * Each wanted slot is matched to the child output slot by column name and 
re-aliased to the
+     * original expr id, so that operators above this rewrite still reference 
the same expr ids.
+     */
+    private List<NamedExpression> mapToChildOutputSlots(List<Slot> 
wantedSlots, List<Slot> childOutput) {
+        Map<String, Slot> childSlotByName = new HashMap<>();
+        for (Slot slot : childOutput) {
+            childSlotByName.put(slot.getName(), slot);
+        }
+        List<NamedExpression> project = new ArrayList<>(wantedSlots.size());
+        for (Slot wanted : wantedSlots) {
+            Slot match = childSlotByName.get(wanted.getName());
+            Preconditions.checkArgument(match != null,
+                    "column %s not found in child output", wanted.getName());
+            if (match.getExprId().equals(wanted.getExprId())) {
+                project.add(match);
+            } else {
+                project.add(new Alias(wanted.getExprId(), match, 
wanted.getName()));
+            }
+        }
+        return project;
+    }
+
+    private Plan projectToOriginSlots(Plan plan, List<Slot> originSlots) {
+        return new LogicalProject<>(mapToChildOutputSlots(originSlots, 
plan.getOutput()), plan);
+    }
+
+    private Plan makeIncrementalScanFromBinlog(CascadesContext 
cascadesContext, LogicalOlapTableStreamScan scan,
+                                               List<Long> selectedPartitionIds,
+                                               OlapTable baseTable, Map<Long, 
Pair<Long, Long>> offsetMap,
+                                               BaseTableStream.StreamScanType 
streamScanType, List<Slot> originSlots,
+                                               List<Slot> newSlots, boolean 
isIncremental) {
+        // remap scan from binlog
+        RowBinlogTableWrapper table =
+                new RowBinlogTableWrapper(baseTable, offsetMap);
+        Map<String, String> scanParams = new HashMap<>();
+        scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE, 
streamScanType.toString());
+        LogicalOlapScan newScan = new 
LogicalOlapScan(cascadesContext.getStatementContext().getNextRelationId(),
+                table, scan.qualified(), selectedPartitionIds, 
scan.getSelectedTabletIds(),
+                new ArrayList<>(), scan.getTableSample(), ImmutableList.of())
+                .withTableScanParams(new 
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
+                        Lists.newArrayList()));
+        Plan plan = newScan;
+        List<Slot> binlogOutputSlots = 
newScan.getLogicalProperties().getOutput();
+        // project stream virtual slot from binlog
+        Slot opSlot = null;
+        Slot seqSlot = null;
+        for (int i = 0; i < binlogOutputSlots.size(); i++) {
+            if 
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
+                seqSlot = binlogOutputSlots.get(i);
+            } else if 
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_OPERATION_COL)) {
+                opSlot = binlogOutputSlots.get(i);
             }
-            Map<String, String> scanParams = new HashMap<>();
-            scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
-                    ((OlapTableStreamWrapper) 
scan.getTable()).getStreamScanType().toString());
-            Plan plan = scan.withSelectedPartitionIds(incrementalPartitionIds, 
true)
-                    .withCachedOutput(new ArrayList<>(scanSlots))
-                    .withIncrementalScan(true)
-                    .withTableScanParams(new 
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
-                            Lists.newArrayList()))
-                    .withNormalized(true);
-            // replace virtual column with alias slot reference
-            List<NamedExpression> project = newSlots.stream()
-                    
.map(NamedExpression.class::cast).collect(Collectors.toList());
+        }
+        if (streamScanType.equals(BaseTableStream.StreamScanType.APPEND_ONLY)) 
{
+            // filter append-only operation if needed
+            Preconditions.checkArgument(opSlot != null);
+            plan = new LogicalFilter<>(ImmutableSet.of(new EqualTo(opSlot,
+                    new BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND))), plan);
+        }
+        List<NamedExpression> project = binlogOutputSlots.stream()
+                .map(NamedExpression.class::cast).collect(Collectors.toList());
+        plan = new LogicalProject<>(project, plan);

Review Comment:
   will remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to