morrySnow commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3497297050
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -189,37 +183,40 @@ public boolean hasData(Partition partition) {
}
public boolean hasHistoricalData(long partitionId) {
- return historicalPartitionOffset.containsKey(partitionId);
+ return historicalPartitionTSO.containsKey(partitionId);
+ }
+
+ public boolean hasConsumedData(long partitionId) {
+ return partitionOffset.containsKey(partitionId);
}
public Pair<Long, Long> getStreamUpdate(Long partitionId) {
- return Pair.of(partitionOffset.get(partitionId),
historicalPartitionOffset.get(partitionId));
+ // if partition has historical data, return <historical tso, current
tso>
+ // otherwise, return <current tso, current tso>
+ Long left = partitionOffset.get(partitionId);
+ if (historicalPartitionTSO.containsKey(partitionId)) {
+ left = historicalPartitionTSO.get(partitionId);
+ }
+ return Pair.of(left,
getBaseTableNullable().getPartition(partitionId).getTso());
Review Comment:
`getBaseTableNullable().getPartition(partitionId).getTso()` 和
`partitionOffset.get(partitionId)` 什么区别?从注释看都是`current tso`?
##########
fe/fe-core/src/main/java/org/apache/doris/qe/TimeBasedChangeVisibleWaiter.java:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.tso.TSOTimestamp;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TimeBasedChangeVisibleWaiter {
Review Comment:
写一下注释,介绍这个类的功能和工作原理
##########
fe/fe-core/src/main/java/org/apache/doris/qe/TimeBasedChangeVisibleWaiter.java:
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.tso.TSOTimestamp;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TimeBasedChangeVisibleWaiter {
+ private final ConnectContext context;
+ private final Plan plan;
Review Comment:
不要一直持有plan,plan占用内存较大,应尽可能提早释放。所以这里可以将 collectDbToTableEndTSO 提前算出,然后释放plan
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -445,6 +451,9 @@ protected void collectAndLockTable(boolean showPlanProcess)
{
preloadResult.getCandidateTableCount());
}
}
+ if (waitForChangeVisible) {
+ waitForTimeBasedChangeVisibleBeforeLock();
+ }
Review Comment:
加一个计时器,以便统计这块的耗时
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -113,32 +125,37 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
// history plan
if (!historicalPartitionIds.isEmpty()) {
- List<Slot> scanSlots = new ArrayList<>(newSlots);
- // add delete sign column if unique base table
+ // for not consume history partition we just scan base table
+ LogicalOlapScan baseScan = new LogicalOlapScan(
+ cascadesContext.getStatementContext().getNextRelationId(),
baseTable, scan.qualified(),
+ historicalPartitionIds, scan.getSelectedTabletIds(), new
ArrayList<>(), scan.getTableSample(),
+ ImmutableList.of());
+
+ List<Slot> baseOutputSlots =
baseScan.getLogicalProperties().getOutput();
+ Plan plan = baseScan;
Slot deleteSlot = null;
- for (Column column : scan.getTable().getBaseSchema(true)) {
- if (column.getName().equals(Column.DELETE_SIGN)) {
- deleteSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- scanSlots.add(deleteSlot);
+ Slot tsoSlot = null;
+ for (Slot slot : baseOutputSlots) {
+ if (slot.getName().equals(Column.DELETE_SIGN)) {
+ deleteSlot = slot;
+ }
+ if (slot.getName().equals(Column.COMMIT_TSO_COL)) {
+ tsoSlot = slot;
+ }
+ if (deleteSlot != null && tsoSlot != null) {
break;
}
}
- LogicalOlapTableStreamScan newScan =
scan.withSelectedPartitionIds(historicalPartitionIds, true)
- .withCachedOutput(new ArrayList<>(scanSlots))
- .withNormalized(true);
- Plan plan = newScan;
if (deleteSlot != null) {
Expression conjunct = new EqualTo(deleteSlot, new
TinyIntLiteral((byte) 0));
if (!scan.getTable().getEnableUniqueKeyMergeOnWrite()) {
- newScan = newScan.withPreAggStatus(PreAggStatus.off(
+ plan = baseScan.withPreAggStatus(PreAggStatus.off(
Column.DELETE_SIGN + " is used as conjuncts."));
}
- plan = new LogicalFilter<>(ImmutableSet.of(conjunct), newScan);
+ plan = new LogicalFilter<>(ImmutableSet.of(conjunct), plan);
}
- // replace virtual column with constant projection
- List<NamedExpression> project = newSlots.stream()
-
.map(NamedExpression.class::cast).collect(Collectors.toList());
+ Preconditions.checkArgument(tsoSlot != null);
Review Comment:
add error message
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -450,6 +483,140 @@ public static LogicalPlan
checkAndAddDeleteSignFilter(LogicalOlapScan scan, Conn
return scan;
}
+ /**
+ * Build the time-travel (FOR VERSION/TIME AS OF) plan for an olap scan.
+ * dup: Filter(__DORIS_COMMIT_TSO_COL__ <= targetTso).
+ * mow: base(survived rows, tso<=t1) UNION ALL binlog(before-image of
UPDATE_BEFORE/DELETE).
+ */
+ private LogicalPlan buildTimeTravelPlan(LogicalOlapScan scan, OlapTable
olapTable,
+ TableSnapshot snapshot, UnboundRelation unboundRelation,
List<String> qualifier,
+ List<Long> partIds, List<Long> tabletIds, CascadesContext
cascadesContext) {
+ validateTimeTravel(olapTable);
+ long targetTso = resolveSnapshotTso(snapshot);
+ if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
+ return addCommitTsoFilter(scan, scan, targetTso, olapTable);
+ }
+ return buildMowTimeTravelUnion(scan, olapTable, targetTso,
unboundRelation,
+ qualifier, partIds, tabletIds, cascadesContext);
+ }
+
+ /**
+ * Validate that the table supports time travel: row binlog enabled, dup
or mow key type,
+ * and (for mow) historical value recorded in binlog for the union right
branch.
+ */
+ private void validateTimeTravel(OlapTable olapTable) {
+ if (!olapTable.enableTso()) {
+ throw new AnalysisException("FOR VERSION/TIME AS OF requires row
binlog "
+ +
"(PROPERTIES('binlog.enable'='true','binlog.format'='ROW')) on table "
+ + olapTable.getQualifiedName());
+ }
+ KeysType keysType = olapTable.getKeysType();
+ if (keysType != KeysType.DUP_KEYS &&
!olapTable.isUniqKeyMergeOnWrite()) {
+ throw new AnalysisException("FOR VERSION/TIME AS OF is only
supported on duplicate "
+ + "or unique merge-on-write tables. Table " +
olapTable.getQualifiedName()
+ + " is " + keysType + ".");
+ }
+ if (olapTable.isUniqKeyMergeOnWrite()
+ && !olapTable.getBinlogConfig().getNeedHistoricalValue()) {
+ throw new AnalysisException("FOR VERSION/TIME AS OF on
merge-on-write table requires "
+ + "binlog.need_historical_value=true. Table " +
olapTable.getQualifiedName());
+ }
+ }
+
+ /**
+ * Add Filter(__DORIS_COMMIT_TSO_COL__ <= targetTso) on top of {@code
child}. The tso slot is
+ * resolved from {@code scan} output; {@code child} must pass through the
scan output slots.
+ */
+ private LogicalPlan addCommitTsoFilter(LogicalPlan child, LogicalOlapScan
scan,
+ long targetTso, OlapTable olapTable) {
+ Slot tsoSlot = null;
+ for (Slot slot : scan.getOutput()) {
+ if (slot.getName().equals(Column.COMMIT_TSO_COL)) {
+ tsoSlot = slot;
+ break;
+ }
+ }
+ Preconditions.checkArgument(tsoSlot != null,
+ "%s not found on table %s", Column.COMMIT_TSO_COL,
olapTable.getQualifiedName());
+ Expression conjunct = new LessThanEqual(tsoSlot, new
BigIntLiteral(targetTso));
+ return new LogicalFilter<>(ImmutableSet.of(conjunct), child);
+ }
+
+ /**
+ * Resolve a TableSnapshot to a target commit tso (inclusive upper bound).
+ * VERSION: the literal is the tso itself. TIME: wall-clock string ->
ms -> tso upper bound.
+ */
+ private long resolveSnapshotTso(TableSnapshot snapshot) {
+ if (snapshot.getType() == TableSnapshot.VersionType.VERSION) {
+ try {
+ return Long.parseLong(snapshot.getValue().trim());
+ } catch (NumberFormatException e) {
+ throw new AnalysisException(
+ "Invalid version in FOR VERSION AS OF: " +
snapshot.getValue());
+ }
+ }
+ long ms = OlapScanNode.parseChangeTimestamp(snapshot.getValue());
+ return TSOTimestamp.composeFullTimestamp(ms);
+ }
+
+ /**
+ * mow time-travel: A|t1 = base(survived rows, tso<=t1) UNION ALL
binlog(before-image of
+ * UPDATE_BEFORE/DELETE since t1). The binlog right branch reuses the
@incr (MIN_DELTA) machinery;
+ * BE splits each change into rows where UPDATE_BEFORE/DELETE rows already
carry the before value.
+ */
+ private LogicalPlan buildMowTimeTravelUnion(LogicalOlapScan baseScan,
OlapTable olapTable,
+ long targetTso, UnboundRelation unboundRelation, List<String>
qualifier,
+ List<Long> partIds, List<Long> tabletIds, CascadesContext
cascadesContext) {
+ // union baseline = base visible columns (key + value); hidden cols
are filtered out.
+ List<Slot> visibleOutput = baseScan.getOutput().stream()
+ .filter(slot -> !(slot instanceof SlotReference)
+ || ((SlotReference) slot).isVisible())
+ .collect(Collectors.toList());
+ List<NamedExpression> visibleProjects = visibleOutput.stream()
+ .map(NamedExpression.class::cast).collect(Collectors.toList());
+
+ // left: base survived rows at t1 = delete_sign=0 AND commit_tso<=t1,
projected to visible.
+ LogicalPlan left = checkAndAddDeleteSignFilter(baseScan,
ConnectContext.get(), olapTable, true);
+ left = addCommitTsoFilter(left, baseScan, targetTso, olapTable);
+ left = new LogicalProject<>(visibleProjects, left);
+
+ // right: binlog MIN_DELTA over tso>t1, keep UPDATE_BEFORE/DELETE rows
(before image),
+ // projected to the same visible schema. BE splits each change so
UPDATE_BEFORE/DELETE rows
+ // already carry the pre-change value in the (same-named) value
columns.
+ RowBinlogTableWrapper binlogTable = new
RowBinlogTableWrapper(olapTable);
+ RelationId binlogRelationId =
cascadesContext.getStatementContext().getNextRelationId();
+ LogicalOlapScan binlogScan = CollectionUtils.isEmpty(partIds)
+ ? new LogicalOlapScan(binlogRelationId, binlogTable,
qualifier, tabletIds,
+ unboundRelation.getHints(),
unboundRelation.getTableSample(), ImmutableList.of())
+ : new LogicalOlapScan(binlogRelationId, binlogTable,
qualifier, partIds, tabletIds,
+ unboundRelation.getHints(),
unboundRelation.getTableSample(), ImmutableList.of());
+ Map<String, String> incrParams = new HashMap<>();
+ incrParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
StreamScanType.MIN_DELTA.name());
+ incrParams.put(OlapScanNode.OLAP_START_TIMESTAMP,
String.valueOf(targetTso));
+ binlogScan = binlogScan.withTableScanParams(
+ new TableScanParams(TableScanParams.INCREMENTAL_READ,
incrParams, Lists.newArrayList()));
Review Comment:
可以一次创建,减少对象创建吗?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -553,8 +716,9 @@ public static LogicalPlan addAppendOnlyFilter(LogicalPlan
scan) {
return new LogicalFilter<>(conjuncts, scan);
}
- private boolean isScanAppendOnlyTableStream(OlapTableStreamWrapper stream)
{
- return
stream.getStreamScanType().equals(BaseTableStream.StreamScanType.APPEND_ONLY);
+ private boolean isScanAppendOnlyTableStream(LogicalOlapScan scan) {
+ return ((OlapTableStreamWrapper) scan.getTable()).getStreamScanType()
+ .equals(BaseTableStream.StreamScanType.APPEND_ONLY);
Review Comment:
这里如果需要强转,应当改成传入的参数就是 OlapTableStreamWrapper 类型
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
Review Comment:
nit: 覆写 LogicalOlapTableStreamScan 的 getTable,将返回类型检查和强制转换置于其内部
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
Review Comment:
直接调用 getOutput 不要调用 getLogicalProperties().getOutput();
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
Review Comment:
增加注释,解释plan的结构
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -151,42 +168,102 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
&& ((SlotReference)
slot).getOriginalColumn().isPresent()
&& ((SlotReference) slot).getOriginalColumn().get()
.equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)) {
- project.add(new Alias(slot.getExprId(), new Nullable(new
BigIntLiteral(-1)),
- Column.STREAM_SEQ_COL));
+ project.add(new Alias(slot.getExprId(), tsoSlot,
Column.STREAM_SEQ_COL));
}
}
historyPlan = new LogicalProject<>(project, plan);
}
// incremental plan
if (!incrementalPartitionIds.isEmpty()) {
- List<Slot> scanSlots = new ArrayList<>(newSlots);
- // add slot from binlog
- Slot opSlot = null;
- Slot seqSlot = null;
- for (Column column : ((OlapTableStreamWrapper)
scan.getTable()).getRowBinlogSchema()) {
- if (column.getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
- seqSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- scanSlots.add(seqSlot);
- } else if
(column.getName().equals(Column.BINLOG_OPERATION_COL)) {
- opSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- scanSlots.add(opSlot);
- }
+ // remap scan from binlog
+ incrementalPlan = makeIncrementalScanFromBinlog(cascadesContext,
scan, incrementalPartitionIds,
+ baseTable,
streamWrapper.getPartitionOffsets(incrementalPartitionIds),
+ streamWrapper.getStreamScanType(), originSlots, newSlots,
true);
+ }
+
+ return combineTwoPlan(historyPlan, incrementalPlan, originSlots);
+ }
+
+ private Plan refreshUnionChildOutputExprIds(Plan plan, List<Slot>
unionOutputs) {
+ Preconditions.checkState(plan.getOutput().size() ==
unionOutputs.size(),
+ "Union child output size %s does not match union output size
%s",
+ plan.getOutput().size(), unionOutputs.size());
+ List<NamedExpression> project = new
ArrayList<>(plan.getOutput().size());
+ for (int i = 0; i < plan.getOutput().size(); i++) {
+ project.add(new Alias(plan.getOutput().get(i),
unionOutputs.get(i).getName()));
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ /**
+ * Build a projection list that exposes the columns of {@code wantedSlots}
(slots taken from the
+ * original stream scan output) on top of the rewritten child whose output
is {@code childOutput}.
+ * Each wanted slot is matched to the child output slot by column name and
re-aliased to the
+ * original expr id, so that operators above this rewrite still reference
the same expr ids.
+ */
+ private List<NamedExpression> mapToChildOutputSlots(List<Slot>
wantedSlots, List<Slot> childOutput) {
+ Map<String, Slot> childSlotByName = new HashMap<>();
+ for (Slot slot : childOutput) {
+ childSlotByName.put(slot.getName(), slot);
+ }
+ List<NamedExpression> project = new ArrayList<>(wantedSlots.size());
+ for (Slot wanted : wantedSlots) {
+ Slot match = childSlotByName.get(wanted.getName());
+ Preconditions.checkArgument(match != null,
+ "column %s not found in child output", wanted.getName());
+ if (match.getExprId().equals(wanted.getExprId())) {
+ project.add(match);
+ } else {
+ project.add(new Alias(wanted.getExprId(), match,
wanted.getName()));
+ }
+ }
+ return project;
+ }
+
+ private Plan projectToOriginSlots(Plan plan, List<Slot> originSlots) {
+ return new LogicalProject<>(mapToChildOutputSlots(originSlots,
plan.getOutput()), plan);
+ }
+
+ private Plan makeIncrementalScanFromBinlog(CascadesContext
cascadesContext, LogicalOlapTableStreamScan scan,
+ List<Long> selectedPartitionIds,
+ OlapTable baseTable, Map<Long,
Pair<Long, Long>> offsetMap,
+ BaseTableStream.StreamScanType
streamScanType, List<Slot> originSlots,
+ List<Slot> newSlots, boolean
isIncremental) {
+ // remap scan from binlog
+ RowBinlogTableWrapper table =
+ new RowBinlogTableWrapper(baseTable, offsetMap);
+ Map<String, String> scanParams = new HashMap<>();
+ scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
streamScanType.toString());
+ LogicalOlapScan newScan = new
LogicalOlapScan(cascadesContext.getStatementContext().getNextRelationId(),
+ table, scan.qualified(), selectedPartitionIds,
scan.getSelectedTabletIds(),
+ new ArrayList<>(), scan.getTableSample(), ImmutableList.of())
+ .withTableScanParams(new
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
+ Lists.newArrayList()));
+ Plan plan = newScan;
+ List<Slot> binlogOutputSlots =
newScan.getLogicalProperties().getOutput();
+ // project stream virtual slot from binlog
+ Slot opSlot = null;
+ Slot seqSlot = null;
+ for (int i = 0; i < binlogOutputSlots.size(); i++) {
+ if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_TIMESTAMP_COL)) {
+ seqSlot = binlogOutputSlots.get(i);
+ } else if
(binlogOutputSlots.get(i).getName().equals(Column.BINLOG_OPERATION_COL)) {
+ opSlot = binlogOutputSlots.get(i);
}
- Map<String, String> scanParams = new HashMap<>();
- scanParams.put(OlapScanNode.OLAP_INCREMENT_TYPE,
- ((OlapTableStreamWrapper)
scan.getTable()).getStreamScanType().toString());
- Plan plan = scan.withSelectedPartitionIds(incrementalPartitionIds,
true)
- .withCachedOutput(new ArrayList<>(scanSlots))
- .withIncrementalScan(true)
- .withTableScanParams(new
TableScanParams(TableScanParams.INCREMENTAL_READ, scanParams,
- Lists.newArrayList()))
- .withNormalized(true);
- // replace virtual column with alias slot reference
- List<NamedExpression> project = newSlots.stream()
-
.map(NamedExpression.class::cast).collect(Collectors.toList());
+ }
+ if (streamScanType.equals(BaseTableStream.StreamScanType.APPEND_ONLY))
{
+ // filter append-only operation if needed
+ Preconditions.checkArgument(opSlot != null);
+ plan = new LogicalFilter<>(ImmutableSet.of(new EqualTo(opSlot,
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND))), plan);
+ }
+ List<NamedExpression> project = binlogOutputSlots.stream()
+ .map(NamedExpression.class::cast).collect(Collectors.toList());
+ plan = new LogicalProject<>(project, plan);
Review Comment:
这里是无用的project?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -63,43 +72,46 @@
* 2. add delete sign column if unique base table
*/
public class NormalizeOlapTableStreamScan extends OneRewriteRuleFactory {
- private static final long ROW_BINLOG_APPEND = 0;
- private static final long ROW_BINLOG_DELETE = 1;
- private static final long ROW_BINLOG_UPDATE_BEFORE = 2;
- private static final long ROW_BINLOG_UPDATE_AFTER = 3;
-
@Override
public Rule build() {
return logicalOlapTableStreamScan()
- .when(scan -> !scan.isNormalized())
- .then(this::normalize)
+ .thenApply(ctx -> normalize(ctx.root, ctx.cascadesContext))
.toRule(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN);
}
private static Expression buildChangeTypeExpr(Slot opSlot) {
return new CaseWhen(ImmutableList.of(
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_APPEND)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND)),
new VarcharLiteral("APPEND")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_DELETE)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE)),
new VarcharLiteral("DELETE")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_UPDATE_BEFORE)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)),
new VarcharLiteral("UPDATE_BEFORE")),
- new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(ROW_BINLOG_UPDATE_AFTER)),
+ new WhenClause(new EqualTo(opSlot, new
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_AFTER)),
new VarcharLiteral("UPDATE_AFTER"))), new
VarcharLiteral("UNKNOWN"));
}
- private Plan normalize(LogicalOlapTableStreamScan scan) {
+ private Plan normalize(LogicalOlapTableStreamScan scan, CascadesContext
cascadesContext) {
List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
if (selectedPartitionIds.isEmpty()) {
return scan;
}
- List<Long> historicalPartitionIds =
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
- .filterHistoryPartitionIds(selectedPartitionIds));
- List<Long> incrementalPartitionIds =
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
- .filterIncrementalPartitionIds(selectedPartitionIds));
+ if (scan.isReset()) {
+ return makeResetOlapFullScan(scan, cascadesContext);
+ }
+ if (scan.isSnapshot()) {
+ return makeSnapshotScan(scan, cascadesContext);
+ }
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
Review Comment:
这个以下,再抽一个 makeXXX 函数比较好
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ selectedPartitionIds =
streamWrapper.filterConsumedPartitionIds(selectedPartitionIds);
+ // normal partition has no new data after snapshot, scan base table
directly
+ List<Long> normalPartitionIds =
streamWrapper.filterNormalSnapshotPartitionIds(selectedPartitionIds);
+ Set<Long> normalPartitionIdSet =
ImmutableSet.copyOf(normalPartitionIds);
+ // rebuild partition has new data after snapshot, need to rebuild
+ List<Long> rebuildPartitionIds =
+ ImmutableList.copyOf(selectedPartitionIds.stream()
+ .filter(id ->
!normalPartitionIdSet.contains(id)).collect(Collectors.toList()));
+ if (baseTable.getKeysType().equals(KeysType.DUP_KEYS)) {
+ // dup key table can just rebuild from base table
+ Map<Long, Pair<Long, Long>> partitionOffsetMap =
+
streamWrapper.getHistoryPartitionOffsets(selectedPartitionIds);
+ OlapTableWrapper table =
+ new OlapTableWrapper(baseTable, partitionOffsetMap);
+ return projectToOriginSlots(makeOlapScanOnBaseTable(scan,
cascadesContext, table, selectedPartitionIds),
+ originSlots);
+ }
+ Plan normalPlan = null;
+ Plan rebuildPlan = null;
+ if (!normalPartitionIds.isEmpty()) {
+ normalPlan = makeOlapScanOnBaseTable(scan, cascadesContext,
baseTable, normalPartitionIds);
+ // project filter invisible slots to match rebuild Plan, and keep
the original
+ // stream scan expr ids so that parent operators still reference
them
+ normalPlan = projectToOriginSlots(normalPlan, originSlots);
Review Comment:
在整段代码的最后再进行 projectToOriginSlots 操作。这样看起来比较顺畅。现在在两个分支分别执行了这个动作。如果在
combineTwoPlan 中需要 union,则还需要移除这个project
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ selectedPartitionIds =
streamWrapper.filterConsumedPartitionIds(selectedPartitionIds);
+ // normal partition has no new data after snapshot, scan base table
directly
+ List<Long> normalPartitionIds =
streamWrapper.filterNormalSnapshotPartitionIds(selectedPartitionIds);
+ Set<Long> normalPartitionIdSet =
ImmutableSet.copyOf(normalPartitionIds);
+ // rebuild partition has new data after snapshot, need to rebuild
+ List<Long> rebuildPartitionIds =
+ ImmutableList.copyOf(selectedPartitionIds.stream()
+ .filter(id ->
!normalPartitionIdSet.contains(id)).collect(Collectors.toList()));
+ if (baseTable.getKeysType().equals(KeysType.DUP_KEYS)) {
+ // dup key table can just rebuild from base table
+ Map<Long, Pair<Long, Long>> partitionOffsetMap =
+
streamWrapper.getHistoryPartitionOffsets(selectedPartitionIds);
+ OlapTableWrapper table =
+ new OlapTableWrapper(baseTable, partitionOffsetMap);
+ return projectToOriginSlots(makeOlapScanOnBaseTable(scan,
cascadesContext, table, selectedPartitionIds),
+ originSlots);
+ }
Review Comment:
应当在计算完 selectedPartitionIds 立即做这个判断,以避免不必要的计算
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -201,21 +278,120 @@ private Plan normalize(LogicalOlapTableStreamScan scan) {
project.add(new Alias(slot.getExprId(), seqSlot,
Column.STREAM_SEQ_COL));
}
}
- incrementalPlan = new LogicalProject<>(project, plan);
+ } else {
+ // only filter delete & update before rows for building before
snapshot image
+ Preconditions.checkArgument(opSlot != null);
+ Expression opFilter = new InPredicate(opSlot, ImmutableList.of(
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE),
+ new BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)));
+ plan = new LogicalFilter<>(ImmutableSet.of(opFilter), plan);
+ }
+ return new LogicalProject<>(project, plan);
+ }
+
+ private Plan makeResetOlapFullScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ // make olap scan on base table
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ Plan plan = makeOlapScanOnBaseTable(scan, cascadesContext, baseTable,
scan.getSelectedPartitionIds());
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ return projectToOriginSlots(plan, originSlots);
+ }
+
+ private Plan makeSnapshotScan(LogicalOlapTableStreamScan scan,
CascadesContext cascadesContext) {
+ List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
+ OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper)
scan.getTable();
+ OlapTable baseTable = streamWrapper.getBaseTable();
+ List<Slot> originSlots = scan.getLogicalProperties().getOutput();
+ selectedPartitionIds =
streamWrapper.filterConsumedPartitionIds(selectedPartitionIds);
+ // normal partition has no new data after snapshot, scan base table
directly
+ List<Long> normalPartitionIds =
streamWrapper.filterNormalSnapshotPartitionIds(selectedPartitionIds);
+ Set<Long> normalPartitionIdSet =
ImmutableSet.copyOf(normalPartitionIds);
+ // rebuild partition has new data after snapshot, need to rebuild
+ List<Long> rebuildPartitionIds =
+ ImmutableList.copyOf(selectedPartitionIds.stream()
+ .filter(id ->
!normalPartitionIdSet.contains(id)).collect(Collectors.toList()));
Review Comment:
直接使用 ImmutableList 的 Collector 避免重复构建 list
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]