yujun777 commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3361113253


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -895,12 +897,19 @@ private static List<RewriteJob> getWholeTreeRewriteJobs(
                 ImmutableSet.of(LogicalCTEAnchor.class),
                 () -> {
                     List<RewriteJob> rewriteJobs = 
Lists.newArrayListWithExpectedSize(300);
-                    rewriteJobs.add(
-                            topic("normalize olap table stream scan",
-                                    
custom(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN,
-                                            NormalizeOlapTableStreamScan::new)
-                            )
-                    );
+                    if (Config.enable_table_stream) {

Review Comment:
   `NormalizeOlapTableBinlogScan` is also needed for base-table `@incr(...)` 
scans, but this whole block is gated by `Config.enable_table_stream`. 
`BindRelation` accepts `table@incr(...)` based on row binlog requirements and 
does not require `enable_table_stream`, so with binlog enabled but table stream 
disabled the change scan can skip normalization.
   
   In that case the scan remains non-incremental; `OlapScanNode` will set 
start/end timestamp in the `hasChangeScan` branch but will not set 
`binlog_scan_type`, so DETAIL/MIN_DELTA/APPEND_ONLY semantics depend on BE 
defaults. Either `@incr` should be gated by the same config, or the binlog 
normalize rule should run independently of table stream.



##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -1745,7 +1764,11 @@ protected void 
unprotectedCommitTransaction2PC(TransactionState transactionState
                     continue;
                 }
                 partitionCommitInfo.setVersion(partition.getNextVersion());
-                partitionCommitInfo.setVersionTime(System.currentTimeMillis());
+                if (Config.enable_feature_binlog && table.enableTso()) {
+                    partitionCommitInfo.setVersionTime(commitTSO);

Review Comment:
   The 2PC path does not populate the new `PartitionCommitInfo.tso` field. The 
normal commit paths use `generatePartitionCommitInfo(..., commitTSO)`, but here 
we only set the version and then put `commitTSO` into `versionTime`. During 
publish, `partitionCommitInfo.getTso()` is still the default `-1`, so 
`partition.updateVisibleVersionAndTime(version, versionTime, tso)` leaves 
`Partition.tso` invalid for row-binlog/table-stream offset tracking. It also 
stores an encoded TSO value as visible-version time.
   
   This branch should set the partition commit TSO separately from versionTime, 
same as the non-2PC commit paths.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -295,17 +297,28 @@ public AbstractInsertExecutor initPlan(ConnectContext 
ctx, StmtExecutor stmtExec
 
             List<ScanNode> tableStreamScanNodes =
                     buildResult.planner.getScanNodes().stream()
-                            .filter(s -> s.getTableIf() instanceof 
OlapTableStreamWrapper).collect(Collectors.toList());
+                            .filter((s -> (s.getTableIf() instanceof 
OlapTableStreamWrapper

Review Comment:
   This also matches base-table `@incr(...)` scans, not only table-stream 
scans. `BindRelation` builds base-table CHANGES reads as incremental 
`OlapScanNode`s over `RowBinlogTableWrapper`, but those wrappers are created 
without a parent stream. Then line 320 does `((RowBinlogTableWrapper) 
scanNode.getTableIf()).getParent()` and line 323 dereferences it, so `INSERT 
INTO ... SELECT ... FROM base_table@incr(...)` can NPE or incorrectly try to 
register a stream offset update for a non-stream read.
   
   This filter should only include real stream consumption scans, e.g. 
`OlapTableStreamWrapper` or a `RowBinlogTableWrapper` whose parent is non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to