yujun777 commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3340418525
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java:
##########
@@ -230,7 +237,8 @@ public PhysicalOlapScan withPartitionPrunablePredicates(
selectedIndexId, selectedTabletIds, selectedPartitionIds,
hasPartitionPredicate,
distributionSpec, preAggStatus, baseOutputs, groupExpression,
getLogicalProperties(),
getPhysicalProperties(), statistics, tableSample,
operativeSlots, virtualColumns, scoreOrderKeys,
- scoreLimit, scoreRangeInfo, annOrderKeys, annLimit,
tableAlias, partitionPrunablePredicates));
+ scoreLimit, scoreRangeInfo, annOrderKeys, annLimit,
tableAlias, partitionPrunablePredicates, false,
Review Comment:
This should preserve the current `incrementalScan` value instead of
resetting it to `false`. If an incremental row-binlog/stream scan goes through
this copy path, the translator will no longer treat it as an incremental scan,
so it will not wrap the table as `RowBinlogTableWrapper` or emit the binlog
scan range fields.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -72,49 +113,125 @@ public Plan
visitLogicalOlapTableStreamScan(LogicalOlapTableStreamScan scan, Voi
&& ((SlotReference)
slot).getOriginalColumn().isPresent()
&& ((SlotReference) slot).getOriginalColumn().get()
.equals(Column.STREAM_SEQ_VIRTUAL_COLUMN)))
- .collect(Collectors.toList());
+ .collect(Collectors.toList()));
- if (originSlots.equals(newSlots)) {
- return scan;
- }
-
- // add delete sign column if unique base table
- Slot deleteSlot = null;
- for (Column column : scan.getTable().getBaseSchema(true)) {
- if (column.getName().equals(Column.DELETE_SIGN)) {
- deleteSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
- column, scan.qualified());
- newSlots.add(deleteSlot);
- break;
+ // history plan
+ if (!historicalPartitionIds.isEmpty()) {
+ List<Slot> scanSlots = new ArrayList<>(newSlots);
+ // add delete sign column if unique base table
+ Slot deleteSlot = null;
+ for (Column column : scan.getTable().getBaseSchema(true)) {
+ if (column.getName().equals(Column.DELETE_SIGN)) {
+ deleteSlot =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), scan.getTable(),
+ column, scan.qualified());
+ scanSlots.add(deleteSlot);
+ break;
+ }
}
- }
- Plan plan = scan.withCachedOutput(newSlots);
- if (deleteSlot != null) {
- Expression conjunct = new EqualTo(deleteSlot, new
TinyIntLiteral((byte) 0));
- if (!scan.getTable().getEnableUniqueKeyMergeOnWrite()) {
- plan = scan.withPreAggStatus(PreAggStatus.off(
- Column.DELETE_SIGN + " is used as conjuncts."));
+ Plan plan =
scan.withSelectedPartitionIds(historicalPartitionIds, true)
+ .withCachedOutput(new ArrayList<>(scanSlots))
+ .withNormalized(true);
+ if (deleteSlot != null) {
+ Expression conjunct = new EqualTo(deleteSlot, new
TinyIntLiteral((byte) 0));
+ if (!scan.getTable().getEnableUniqueKeyMergeOnWrite()) {
+ plan = scan.withPreAggStatus(PreAggStatus.off(
Review Comment:
This rebuilds the plan from the original `scan`, so it drops the
history-only selected partitions, the cached output that includes `deleteSlot`,
and the normalized flag set above. The filter added below may then reference a
slot that is not produced by its child, and the history branch may scan the
wrong partitions. Please apply the pre-agg change to the already-built history
scan plan instead of going back to `scan`.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -243,6 +257,28 @@ private LogicalPlan makeOlapScan(TableIf table,
UnboundRelation unboundRelation,
CollectionUtils.isEmpty(partIds) ? ((OlapTable)
table).getPartitionIds() : partIds, indexId,
preAggStatus, CollectionUtils.isEmpty(partIds) ?
ImmutableList.of() : partIds,
unboundRelation.getHints(),
unboundRelation.getTableSample(), ImmutableList.of());
+ } else if (isChangeRead(unboundRelation)) {
Review Comment:
Can we simplify this change-scan branch? `isChangeRead(unboundRelation)`
already requires `scanParams.incrementalRead()`, but the branch checks the same
condition again, then later validates and sets `changeScanInfo` after
constructing the scan. It would be easier to reason about if this flow were:
build `ChangeScanInfo`, validate requirements, construct the appropriate
row-binlog scan, set `changeScanInfo`, and return. The commented-out
incremental-scan code below should also be removed.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -503,6 +526,36 @@ private void addScanRangeLocations(Partition partition,
);
paloRange.setVersionHash("");
paloRange.setTabletId(tabletId);
+ if (incrementalScan && hasChangeScan) {
+ Preconditions.checkState(olapTable instanceof
RowBinlogTableWrapper);
+
paloRange.setStartTso(encodePhysicalTimestampToTso(changeStartTimestamp));
+ if (hasChangeEndTimestamp) {
+
paloRange.setEndTso(encodePhysicalTimestampToTso(changeEndTimestamp));
+ }
+
paloRange.setBinlogScanType(informationKindToSchemaScanType(informationKind));
+ paloRange.setBinlogReadSource(TBinlogReadSource.CHANGES);
+ } else if (incrementalScan) {
+ Preconditions.checkState(olapTable instanceof
RowBinlogTableWrapper);
+ RowBinlogTableWrapper binlogWrapper = ((RowBinlogTableWrapper)
olapTable);
+ Pair<Long, Long> update = getStreamUpdate(partition.getId());
+ if (update.first != null) {
+ paloRange.setStartTso(update.first);
+ }
+ if (update.second != null) {
+ paloRange.setEndTso(update.second);
+ }
+ TBinlogScanType streamScanType =
+
BaseTableStream.StreamScanType.toThrift(binlogWrapper.getParent().getConsumeType());
+ paloRange.setBinlogScanType(streamScanType);
+ paloRange.setBinlogReadSource(TBinlogReadSource.STREAM);
+ } else if (hasChangeScan) {
Review Comment:
Please confirm this non-incremental change-scan path is intentionally
relying on the BE default scan type. The `incrementalScan && hasChangeScan`
branch sets `binlogScanType` from `informationKind`, but this branch only sets
`startTso/endTso` and `binlogReadSource`. For the `DETAIL &&
!enable_split_binlog_before` path in `BindRelation`, this looks inconsistent
unless BE defaults to DETAIL here. If that default is required, please add a
comment; otherwise this should set the scan type as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]