TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3510308228
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -63,44 +74,273 @@
* 2. add delete sign column if unique base table
*/
public class NormalizeOlapTableStreamScan extends OneRewriteRuleFactory {
- private static final long ROW_BINLOG_APPEND = 0;
- private static final long ROW_BINLOG_DELETE = 1;
- private static final long ROW_BINLOG_UPDATE_BEFORE = 2;
- private static final long ROW_BINLOG_UPDATE_AFTER = 3;
-
@Override
public Rule build() {
return logicalOlapTableStreamScan()
- .when(scan -> !scan.isNormalized())
- .then(this::normalize)
+ .thenApply(ctx -> normalize(ctx.root, ctx.cascadesContext))
.toRule(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN);
}
private static Expression buildChangeTypeExpr(Slot opSlot) {
return new CaseWhen(ImmutableList.of(
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_APPEND)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND)),
new VarcharLiteral("APPEND")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_DELETE)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE)),
new VarcharLiteral("DELETE")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_UPDATE_BEFORE)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)),
new VarcharLiteral("UPDATE_BEFORE")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_UPDATE_AFTER)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_AFTER)),
new VarcharLiteral("UPDATE_AFTER"))), new
VarcharLiteral("UNKNOWN"));
}
- private Plan normalize(LogicalOlapTableStreamScan scan) {
- List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
- if (selectedPartitionIds.isEmpty()) {
+ private Plan normalize(LogicalOlapTableStreamScan scan, CascadesContext
cascadesContext) {
+ if (scan.getSelectedPartitionIds().isEmpty()) {
return scan;
}
- List<Long> historicalPartitionIds =
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
- .filterHistoryPartitionIds(selectedPartitionIds));
- List<Long> incrementalPartitionIds =
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
- .filterIncrementalPartitionIds(selectedPartitionIds));
+ if (scan.isReset()) {
+ return makeResetOlapFullScan(scan, cascadesContext);
+ }
+ if (scan.isSnapshot()) {
+ return makeSnapshotScan(scan, cascadesContext);
+ }
+ return makeTableStreamScan(scan, cascadesContext);
+ }
+
+ /**
+ * Build a projection list that exposes the columns of {@code wantedSlots}
(slots taken from the
+ * original stream scan output) on top of the rewritten child whose output
is {@code childOutput}.
+ */
+ private List<NamedExpression> mapOriginOutputFromChild(List<Slot>
wantedSlots, List<Slot> childOutput,
+ boolean
useOriginalExprIds) {
+ Map<String, Slot> childSlotByName = new HashMap<>();
+ for (Slot slot : childOutput) {
+ childSlotByName.put(slot.getName(), slot);
+ }
+ List<NamedExpression> project = new ArrayList<>(wantedSlots.size());
+ for (Slot wanted : wantedSlots) {
+ Slot match = childSlotByName.get(wanted.getName());
+ Preconditions.checkArgument(match != null,
+ "column %s not found in child output", wanted.getName());
+ if (useOriginalExprIds) {
+ project.add(new Alias(wanted.getExprId(), match,
wanted.getName()));
+ } else {
+ project.add(new Alias(match, wanted.getName()));
+ }
+ }
+ return project;
+ }
+
+ // project from origin output slots to child slots with new expr ids
+ private Plan projectFromOriginSlots(Plan plan, List<Slot> originSlots) {
+ return new LogicalProject<>(mapOriginOutputFromChild(originSlots,
plan.getOutput(), false), plan);
+ }
+
+ // project to origin output slots with original expr ids
+ private Plan projectToOriginSlots(Plan plan, List<Slot> originSlots) {
+ return new LogicalProject<>(mapOriginOutputFromChild(originSlots,
plan.getOutput(), true), plan);
+ }
+
+ /**
+ * Build an incremental scan reading row-level changes from base table
binlog for the given
+ * partitions and their {@code offsetMap} ((startTso, endTso) per
partition).
+ *
+ * <p>The binlog scan is wrapped by {@link RowBinlogTableWrapper} and
marked as INCREMENTAL_READ.
+ * {@code streamScanType} decides which binlog rows are emitted:
+ * APPEND_ONLY keeps only APPEND rows; MIN_DELTA/DETAIL keep the raw
change rows.
+ *
+ * <p>{@code isIncremental} distinguishes the two callers:
+ * <ul>
+ * <li>true — normal stream consumption: map the binlog op/timestamp
columns into the stream
+ * virtual columns STREAM_CHANGE_TYPE_COL /
STREAM_SEQ_COL.</li>
+ * <li>false — snapshot rebuild: only keep DELETE & UPDATE_BEFORE
rows so they can be added
+ * back to reconstruct the "before" image at the snapshot
point.</li>
+ * </ul>
+ */
+ private Plan makeIncrementalScanFromBinlog(CascadesContext
cascadesContext, LogicalOlapTableStreamScan scan,
+ List<Long> selectedPartitionIds,
+ OlapTable baseTable, Map<Long,
Pair<Long, Long>> offsetMap,
+ BaseTableStream.StreamScanType
streamScanType, List<Slot> originSlots,
+ List<Slot> notVirtualSlots,
boolean isIncremental) {
+ // remap scan from binlog
+ RowBinlogTableWrapper table =
+ new RowBinlogTableWrapper(baseTable, offsetMap);
+ Map<String, String> scanParams = new HashMap<>();
+ scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
streamScanType.toString());
+ LogicalOlapScan newScan = new
LogicalOlapScan(cascadesContext.getStatementContext().getNextRelationId(),
+ table, scan.qualified(), selectedPartitionIds,
scan.getSelectedTabletIds(),
+ new ArrayList<>(), scan.getTableSample(), ImmutableList.of(),
+ Optional.of(new
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
Lists.newArrayList())));
+ Plan plan = newScan;
+ List<Slot> binlogOutputSlots = newScan.getOutput();
+ // project stream virtual slot from binlog
+ Slot opSlot = null;
+ Slot seqSlot = null;
+ for (int i = 0; i < binlogOutputSlots.size(); i++) {
+ if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
+ seqSlot = binlogOutputSlots.get(i);
+ } else if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_OPERATION_COL)) {
+ opSlot = binlogOutputSlots.get(i);
+ }
+ }
+ if (streamScanType.equals(BaseTableStream.StreamScanType.APPEND_ONLY))
{
+ // filter append-only operation if needed
+ Preconditions.checkArgument(opSlot != null);
+ plan = new LogicalFilter<>(ImmutableSet.of(new EqualTo(opSlot,
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND))), plan);
+ }
+ List<NamedExpression> project =
mapOriginOutputFromChild(notVirtualSlots, binlogOutputSlots, false);
+ if (isIncremental) {
+ // replace stream virtual column with alias slot reference
+ for (Slot slot : originSlots) {
+ if (slot instanceof SlotReference
+ && ((SlotReference)
slot).getOriginalColumn().isPresent()
+ && ((SlotReference) slot).getOriginalColumn().get()
+ .equals(Column.STREAM_CHANGE_TYPE_VIRTUAL_COLUMN)) {
+ project.add(new
Alias(StatementScopeIdGenerator.newExprId(), buildChangeTypeExpr(opSlot),
+ Column.STREAM_CHANGE_TYPE_COL));
+ } else if (slot instanceof SlotReference
+ && ((SlotReference)
slot).getOriginalColumn().isPresent()
+ && ((SlotReference) slot).getOriginalColumn().get()
+ .equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)) {
+ project.add(new
Alias(StatementScopeIdGenerator.newExprId(), seqSlot, Column.STREAM_SEQ_COL));
+ }
+ }
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ /**
+ * Reset mode: the stream is (re)initialized to a full snapshot of the
base table, so we simply
+ * do a full olap scan over the base table (with full schema) and project
back to the origin
+ * stream output slots. No binlog / virtual columns are involved.
+ */
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ /**
+ * Snapshot mode: read a consistent snapshot of the stream at its
consumption point.
+ *
+ * <p>For DUP_KEYS tables the snapshot can be rebuilt directly from the
base table using the
+ * historical partition offsets.
+ *
+ * <p>For unique/agg tables partitions are split into two groups:
+ * <ul>
+ * <li>normal partitions — no new data after the snapshot point, scanned
from base table as-is;</li>
+ * <li>rebuild partitions — have newer data after the snapshot point, so
the snapshot image is
+ * reconstructed by scanning the base table (rows with commit tso
<= consumption tso) and
+ * adding back the DELETE / UPDATE_BEFORE rows from binlog.</li>
+ * </ul>
+ * The two parts are unioned and projected back to the origin output slots.
+ */
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ selectedPartitionIds =
streamWrapper.filterConsumedPartitionIds(selectedPartitionIds);
+ if (baseTable.getKeysType().equals(KeysType.DUP_KEYS)) {
+ // dup key table can just rebuild from base table
+ Map<Long, Pair<Long, Long>> partitionOffsetMap =
+
streamWrapper.getHistoryPartitionOffsets(selectedPartitionIds);
+ OlapTableWrapper table =
+ new OlapTableWrapper(baseTable, partitionOffsetMap);
+ return projectToOriginSlots(makeOlapScanOnBaseTable(scan,
cascadesContext, table, selectedPartitionIds),
+ originSlots);
+ }
+ // normal partition has no new data after snapshot, scan base table
directly
+ List<Long> normalPartitionIds =
streamWrapper.filterNormalSnapshotPartitionIds(selectedPartitionIds);
+ Set<Long> normalPartitionIdSet =
ImmutableSet.copyOf(normalPartitionIds);
+ // rebuild partition has new data after snapshot, need to rebuild
+ List<Long> rebuildPartitionIds =
+ selectedPartitionIds.stream()
+ .filter(id ->
!normalPartitionIdSet.contains(id)).collect(ImmutableList.toImmutableList());
+ Plan normalPlan = null;
+ Plan rebuildPlan = null;
+ if (!normalPartitionIds.isEmpty()) {
+ normalPlan = makeOlapScanOnBaseTable(scan, cascadesContext,
baseTable, normalPartitionIds);
+ }
+ if (!rebuildPartitionIds.isEmpty()) {
+ // base table scan part
+ // build base table offset
+ // for row commit tso <= consumption tso we scan from base table
+ Map<Long, Pair<Long, Long>> partitionOffsetMap =
+
streamWrapper.getHistoryPartitionOffsets(rebuildPartitionIds);
+ OlapTableWrapper table =
+ new OlapTableWrapper(baseTable, partitionOffsetMap);
+ Plan basePartPlan = makeOlapScanOnBaseTable(scan, cascadesContext,
table, rebuildPartitionIds);
+ // we rebuild by add back updated & deleted rows from binlog
+ Plan binlogPartPlan =
makeIncrementalScanFromBinlog(cascadesContext, scan, rebuildPartitionIds,
+ baseTable,
streamWrapper.getPartitionOffsets(rebuildPartitionIds),
+ BaseTableStream.StreamScanType.MIN_DELTA, originSlots,
originSlots, false);
+ rebuildPlan = combineTwoPlan(basePartPlan, binlogPartPlan,
originSlots);
+ }
+ return combineTwoPlan(normalPlan, rebuildPlan, originSlots);
Review Comment:
get 我漏了有一个中间结果也是combine的
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]