TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3505394841


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -450,6 +483,140 @@ public static LogicalPlan 
checkAndAddDeleteSignFilter(LogicalOlapScan scan, Conn
         return scan;
     }
 
+    /**
+     * Build the time-travel (FOR VERSION/TIME AS OF) plan for an olap scan.
+     * dup: Filter(__DORIS_COMMIT_TSO_COL__ <= targetTso).
+     * mow: base(survived rows, tso<=t1) UNION ALL binlog(before-image of 
UPDATE_BEFORE/DELETE).
+     */
+    private LogicalPlan buildTimeTravelPlan(LogicalOlapScan scan, OlapTable 
olapTable,
+            TableSnapshot snapshot, UnboundRelation unboundRelation, 
List<String> qualifier,
+            List<Long> partIds, List<Long> tabletIds, CascadesContext 
cascadesContext) {
+        validateTimeTravel(olapTable);
+        long targetTso = resolveSnapshotTso(snapshot);
+        if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
+            return addCommitTsoFilter(scan, scan, targetTso, olapTable);
+        }
+        return buildMowTimeTravelUnion(scan, olapTable, targetTso, 
unboundRelation,
+                qualifier, partIds, tabletIds, cascadesContext);
+    }
+
+    /**
+     * Validate that the table supports time travel: row binlog enabled, dup 
or mow key type,
+     * and (for mow) historical value recorded in binlog for the union right 
branch.
+     */
+    private void validateTimeTravel(OlapTable olapTable) {
+        if (!olapTable.enableTso()) {
+            throw new AnalysisException("FOR VERSION/TIME AS OF requires row 
binlog "
+                    + 
"(PROPERTIES('binlog.enable'='true','binlog.format'='ROW')) on table "
+                    + olapTable.getQualifiedName());
+        }
+        KeysType keysType = olapTable.getKeysType();
+        if (keysType != KeysType.DUP_KEYS && 
!olapTable.isUniqKeyMergeOnWrite()) {
+            throw new AnalysisException("FOR VERSION/TIME AS OF is only 
supported on duplicate "
+                    + "or unique merge-on-write tables. Table " + 
olapTable.getQualifiedName()
+                    + " is " + keysType + ".");
+        }
+        if (olapTable.isUniqKeyMergeOnWrite()
+                && !olapTable.getBinlogConfig().getNeedHistoricalValue()) {
+            throw new AnalysisException("FOR VERSION/TIME AS OF on 
merge-on-write table requires "
+                    + "binlog.need_historical_value=true. Table " + 
olapTable.getQualifiedName());
+        }
+    }
+
+    /**
+     * Add Filter(__DORIS_COMMIT_TSO_COL__ &lt;= targetTso) on top of {@code 
child}. The tso slot is
+     * resolved from {@code scan} output; {@code child} must pass through the 
scan output slots.
+     */
+    private LogicalPlan addCommitTsoFilter(LogicalPlan child, LogicalOlapScan 
scan,
+            long targetTso, OlapTable olapTable) {
+        Slot tsoSlot = null;
+        for (Slot slot : scan.getOutput()) {
+            if (slot.getName().equals(Column.COMMIT_TSO_COL)) {
+                tsoSlot = slot;
+                break;
+            }
+        }
+        Preconditions.checkArgument(tsoSlot != null,
+                "%s not found on table %s", Column.COMMIT_TSO_COL, 
olapTable.getQualifiedName());
+        Expression conjunct = new LessThanEqual(tsoSlot, new 
BigIntLiteral(targetTso));
+        return new LogicalFilter<>(ImmutableSet.of(conjunct), child);
+    }
+
+    /**
+     * Resolve a TableSnapshot to a target commit tso (inclusive upper bound).
+     * VERSION: the literal is the tso itself. TIME: wall-clock string -&gt; 
ms -&gt; tso upper bound.
+     */
+    private long resolveSnapshotTso(TableSnapshot snapshot) {
+        if (snapshot.getType() == TableSnapshot.VersionType.VERSION) {
+            try {
+                return Long.parseLong(snapshot.getValue().trim());
+            } catch (NumberFormatException e) {
+                throw new AnalysisException(
+                        "Invalid version in FOR VERSION AS OF: " + 
snapshot.getValue());
+            }
+        }
+        long ms = OlapScanNode.parseChangeTimestamp(snapshot.getValue());
+        return TSOTimestamp.composeFullTimestamp(ms);
+    }
+
+    /**
+     * mow time-travel: A|t1 = base(survived rows, tso&lt;=t1) UNION ALL 
binlog(before-image of
+     * UPDATE_BEFORE/DELETE since t1). The binlog right branch reuses the 
@incr (MIN_DELTA) machinery;
+     * BE splits each change into rows where UPDATE_BEFORE/DELETE rows already 
carry the before value.
+     */
+    private LogicalPlan buildMowTimeTravelUnion(LogicalOlapScan baseScan, 
OlapTable olapTable,
+            long targetTso, UnboundRelation unboundRelation, List<String> 
qualifier,
+            List<Long> partIds, List<Long> tabletIds, CascadesContext 
cascadesContext) {
+        // union baseline = base visible columns (key + value); hidden cols 
are filtered out.
+        List<Slot> visibleOutput = baseScan.getOutput().stream()
+                .filter(slot -> !(slot instanceof SlotReference)
+                        || ((SlotReference) slot).isVisible())
+                .collect(Collectors.toList());
+        List<NamedExpression> visibleProjects = visibleOutput.stream()
+                .map(NamedExpression.class::cast).collect(Collectors.toList());
+
+        // left: base survived rows at t1 = delete_sign=0 AND commit_tso<=t1, 
projected to visible.
+        LogicalPlan left = checkAndAddDeleteSignFilter(baseScan, 
ConnectContext.get(), olapTable, true);
+        left = addCommitTsoFilter(left, baseScan, targetTso, olapTable);
+        left = new LogicalProject<>(visibleProjects, left);
+
+        // right: binlog MIN_DELTA over tso>t1, keep UPDATE_BEFORE/DELETE rows 
(before image),
+        // projected to the same visible schema. BE splits each change so 
UPDATE_BEFORE/DELETE rows
+        // already carry the pre-change value in the (same-named) value 
columns.
+        RowBinlogTableWrapper binlogTable = new 
RowBinlogTableWrapper(olapTable);
+        RelationId binlogRelationId = 
cascadesContext.getStatementContext().getNextRelationId();
+        LogicalOlapScan binlogScan = CollectionUtils.isEmpty(partIds)
+                ? new LogicalOlapScan(binlogRelationId, binlogTable, 
qualifier, tabletIds,
+                        unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of())
+                : new LogicalOlapScan(binlogRelationId, binlogTable, 
qualifier, partIds, tabletIds,
+                        unboundRelation.getHints(), 
unboundRelation.getTableSample(), ImmutableList.of());
+        Map<String, String> incrParams = new HashMap<>();
+        incrParams.put(OlapScanNode.OLAP_INCREMENT_TYPE, 
StreamScanType.MIN_DELTA.name());
+        incrParams.put(OlapScanNode.OLAP_START_TIMESTAMP, 
String.valueOf(targetTso));
+        binlogScan = binlogScan.withTableScanParams(
+                new TableScanParams(TableScanParams.INCREMENTAL_READ, 
incrParams, Lists.newArrayList()));

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to