TsukiokaKogane commented on code in PR #64776:
URL: https://github.com/apache/doris/pull/64776#discussion_r3506616192


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/NormalizeOlapTableStreamScan.java:
##########
@@ -63,43 +72,46 @@
  * 2. add delete sign column if unique base table
  */
 public class NormalizeOlapTableStreamScan extends OneRewriteRuleFactory {
-    private static final long ROW_BINLOG_APPEND = 0;
-    private static final long ROW_BINLOG_DELETE = 1;
-    private static final long ROW_BINLOG_UPDATE_BEFORE = 2;
-    private static final long ROW_BINLOG_UPDATE_AFTER = 3;
-
     @Override
     public Rule build() {
         return logicalOlapTableStreamScan()
-                .when(scan -> !scan.isNormalized())
-                .then(this::normalize)
+                .thenApply(ctx -> normalize(ctx.root, ctx.cascadesContext))
                 .toRule(RuleType.NORMALIZE_OlAP_TABLE_STREAM_SCAN);
     }
 
     private static Expression buildChangeTypeExpr(Slot opSlot) {
         return new CaseWhen(ImmutableList.of(
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_APPEND)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_APPEND)),
                         new VarcharLiteral("APPEND")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_DELETE)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_DELETE)),
                         new VarcharLiteral("DELETE")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_UPDATE_BEFORE)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_BEFORE)),
                         new VarcharLiteral("UPDATE_BEFORE")),
-                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(ROW_BINLOG_UPDATE_AFTER)),
+                new WhenClause(new EqualTo(opSlot, new 
BigIntLiteral(BinlogUtils.ROW_BINLOG_UPDATE_AFTER)),
                         new VarcharLiteral("UPDATE_AFTER"))), new 
VarcharLiteral("UNKNOWN"));
     }
 
-    private Plan normalize(LogicalOlapTableStreamScan scan) {
+    private Plan normalize(LogicalOlapTableStreamScan scan, CascadesContext 
cascadesContext) {
         List<Long> selectedPartitionIds = scan.getSelectedPartitionIds();
         if (selectedPartitionIds.isEmpty()) {
             return scan;
         }
-        List<Long> historicalPartitionIds = 
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
-                .filterHistoryPartitionIds(selectedPartitionIds));
-        List<Long> incrementalPartitionIds = 
ImmutableList.copyOf(((OlapTableStreamWrapper) scan.getTable())
-                .filterIncrementalPartitionIds(selectedPartitionIds));
+        if (scan.isReset()) {
+            return makeResetOlapFullScan(scan, cascadesContext);
+        }
+        if (scan.isSnapshot()) {
+            return makeSnapshotScan(scan, cascadesContext);
+        }
+        OlapTableStreamWrapper streamWrapper = (OlapTableStreamWrapper) 
scan.getTable();

Review Comment:
   fixed



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableStreamScan.java:
##########
@@ -56,17 +53,17 @@
  * Logical OlapTableStreamScan
  */
 public class LogicalOlapTableStreamScan extends LogicalOlapScan {
-    private final boolean isNormalized;
-    private final boolean isIncrementalScan;
+    private final boolean isReset;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to