TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3505578957
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -151,42 +168,102 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
&& ((SlotReference)
slot).getOriginalColumn().isPresent()
&& ((SlotReference) slot).getOriginalColumn().get()
.equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)) {
- project.add(new Alias(slot.getExprId(), new Nullable(new
BigIntLiteral(-1)),
- Column.STREAM_SEQ_COL));
+ project.add(new Alias(slot.getExprId(), tsoSlot,
Column.STREAM_SEQ_COL));
}
}
historyPlan = new LogicalProject<>(project, plan);
}
// incremental plan
if (!incrementalPartitionIds.isEmpty()) {
- List<Slot> scanSlots = new ArrayList<>(newSlots);
- // add slot from binlog
- Slot opSlot = null;
- Slot seqSlot = null;
- for (Column column : ((OlapTableStreamWrapper)
scan.getTable()).getRowBinlogSchema()) {
- if (column.getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
- seqSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- scanSlots.add(seqSlot);
- } else if
(column.getName().equals(Column.BINLOG_OPERATION_COL)) {
- opSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- scanSlots.add(opSlot);
- }
+ // remap scan from binlog
+ incrementalPlan = makeIncrementalScanFromBinlog(cascadesContext,
scan, incrementalPartitionIds,
+ baseTable,
streamWrapper.getPartitionOffsets(incrementalPartitionIds),
+ streamWrapper.getStreamScanType(), originSlots, newSlots,
true);
+ }
+
+ return combineTwoPlan(historyPlan, incrementalPlan, originSlots);
+ }
+
+ private Plan refreshUnionChildOutputExprIds(Plan plan, List<Slot>
unionOutputs) {
+ Preconditions.checkState(plan.getOutput().size() ==
unionOutputs.size(),
+ "Union child output size %s does not match union output size
%s",
+ plan.getOutput().size(), unionOutputs.size());
+ List<NamedExpression> project = new
ArrayList<>(plan.getOutput().size());
+ for (int i = 0; i < plan.getOutput().size(); i++) {
+ project.add(new Alias(plan.getOutput().get(i),
unionOutputs.get(i).getName()));
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ /**
+ * Build a projection list that exposes the columns of {@code wantedSlots}
(slots taken from the
+ * original stream scan output) on top of the rewritten child whose output
is {@code childOutput}.
+ * Each wanted slot is matched to the child output slot by column name and
re-aliased to the
+ * original expr id, so that operators above this rewrite still reference
the same expr ids.
+ */
+ private List<NamedExpression> mapToChildOutputSlots(List<Slot>
wantedSlots, List<Slot> childOutput) {
+ Map<String, Slot> childSlotByName = new HashMap<>();
+ for (Slot slot : childOutput) {
+ childSlotByName.put(slot.getName(), slot);
+ }
+ List<NamedExpression> project = new ArrayList<>(wantedSlots.size());
+ for (Slot wanted : wantedSlots) {
+ Slot match = childSlotByName.get(wanted.getName());
+ Preconditions.checkArgument(match != null,
+ "column %s not found in child output", wanted.getName());
+ if (match.getExprId().equals(wanted.getExprId())) {
+ project.add(match);
+ } else {
+ project.add(new Alias(wanted.getExprId(), match,
wanted.getName()));
+ }
+ }
+ return project;
+ }
+
+ private Plan projectToOriginSlots(Plan plan, List<Slot> originSlots) {
+ return new LogicalProject<>(mapToChildOutputSlots(originSlots,
plan.getOutput()), plan);
+ }
+
+ private Plan makeIncrementalScanFromBinlog(CascadesContext
cascadesContext, LogicalOlapTableStreamScan scan,
+ List<Long> selectedPartitionIds,
+ OlapTable baseTable, Map<Long,
Pair<Long, Long>> offsetMap,
+ BaseTableStream.StreamScanType
streamScanType, List<Slot> originSlots,
+ List<Slot> newSlots, boolean
isIncremental) {
+ // remap scan from binlog
+ RowBinlogTableWrapper table =
+ new RowBinlogTableWrapper(baseTable, offsetMap);
+ Map<String, String> scanParams = new HashMap<>();
+ scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
streamScanType.toString());
+ LogicalOlapScan newScan = new
LogicalOlapScan(cascadesContext.getStatementContext().getNextRelationId(),
+ table, scan.qualified(), selectedPartitionIds,
scan.getSelectedTabletIds(),
+ new ArrayList<>(), scan.getTableSample(), ImmutableList.of())
+ .withTableScanParams(new
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
+ Lists.newArrayList()));
+ Plan plan = newScan;
+ List<Slot> binlogOutputSlots =
newScan.getLogicalProperties().getOutput();
+ // project stream virtual slot from binlog
+ Slot opSlot = null;
+ Slot seqSlot = null;
+ for (int i = 0; i < binlogOutputSlots.size(); i++) {
+ if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
+ seqSlot = binlogOutputSlots.get(i);
+ } else if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_OPERATION_COL)) {
+ opSlot = binlogOutputSlots.get(i);
}
- Map<String, String> scanParams = new HashMap<>();
- scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
- ((OlapTableStreamWrapper)
scan.getTable()).getStreamScanType().toString());
- Plan plan = scan.withSelectedPartitionIds(incrementalPartitionIds,
true)
- .withCachedOutput(new ArrayList<>(scanSlots))
- .withIncrementalScan(true)
- .withTableScanParams(new
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
- Lists.newArrayList()))
- .withNormalized(true);
- // replace virtual column with alias slot reference
- List<NamedExpression> project = newSlots.stream()
-
.map(NamedExpression.class::cast).collect(Collectors.toList());
+ }
+ if (streamScanType.equals(BaseTableStream.StreamScanType.APPEND_ONLY))
{
+ // filter append-only operation if needed
+ Preconditions.checkArgument(opSlot != null);
+ plan = new LogicalFilter<>(ImmutableSet.of(new EqualTo(opSlot,
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND))), plan);
+ }
+ List<NamedExpression> project = binlogOutputSlots.stream()
+ .map(NamedExpression.class::cast).collect(Collectors.toList());
+ plan = new LogicalProject<>(project, plan);
Review Comment:
will remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]