924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413446628


##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -1130,8 +1130,10 @@ public boolean isProfileSafeStmt() {
         // 1. CreateTableCommand(mainly for create as select).
         // 2. LoadCommand.
         // 3. InsertOverwriteTableCommand.
+        // 4. MergeIntoCommand (merge into ... using ...).
         if ((plan instanceof Command) && !(plan instanceof LoadCommand)
-                && !(plan instanceof CreateTableCommand) && !(plan instanceof 
InsertOverwriteTableCommand)) {
+                && !(plan instanceof CreateTableCommand) && !(plan instanceof 
InsertOverwriteTableCommand)
+                && !(plan instanceof 
org.apache.doris.nereids.trees.plans.commands.merge.MergeIntoCommand)) {

Review Comment:
   Done in `19de375e12e` — imported `MergeIntoCommand` and use the simple name.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/MaterializationNode.java:
##########
@@ -227,7 +231,17 @@ public void setTopMaterializeNode(boolean 
topMaterializeNode) {
     }
 
     @Override
-    public boolean isSerialOperator() {
+    public boolean isSerialNode() {
         return true;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        Pair<PlanNode, LocalExchangeType> enforceResult = enforceRequire(
+                translatorContext, children.get(0), 0, 
LocalExchangeTypeRequire.requirePassthrough());
+        children = new ArrayList<>();

Review Comment:
   Done in `19de375e12e` — switched to `Lists.newArrayList()`.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3314,7 +3349,7 @@ private boolean 
findOlapScanNodesByPassExchangeNode(PlanNode root) {
         return false;
     }
 
-    private List<List<Expr>> getDistributeExprs(Plan ... children) {
+    private List<List<Expr>> getChildrenDistributeExprs(Plan ... children) {

Review Comment:
   Renamed to `getDistributeExprs(Plan... plans)` in `19de375e12e`. I kept it 
taking the plans directly rather than a parent node, because the callers pass 
specific subsets (e.g. `hashJoin.left()`/`right()`, `child(0)`, or 
`children().toArray(...)`), so a single "parent" input wouldn't fit them all — 
the rename just drops the misleading "Children" from the name.



##########
regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy:
##########
@@ -0,0 +1,1560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression tests for bugs discovered by RQG testing on the local-exchange2 
branch.
+ *
+ * These queries triggered "must set shared state" errors or incorrect results
+ * in RQG build 183992.  Common conditions:
+ *   - use_serial_exchange=true  (makes ALL Exchanges serial, not just 
UNPARTITIONED)
+ *   - enable_local_shuffle_planner=true (FE-planned local exchange)
+ *   - parallel_pipeline_task_num > 1
+ *
+ * Error types reproduced:
+ *   1. must set shared state, in AGGREGATION_OPERATOR
+ *   2. must set shared state, in SORT_OPERATOR
+ *   3. incorrect results with GROUPING SETS + scalar subquery + window 
function
+ */
+suite("test_local_shuffle_rqg_bugs") {
+
+    // ============================================================
+    //  Table setup — mirrors RQG table structure
+    //  10 buckets to match RQG (replication_num=1 for single-BE testing)
+    // ============================================================
+    sql "DROP TABLE IF EXISTS rqg_t1"
+    sql "DROP TABLE IF EXISTS rqg_t2"
+    sql "DROP TABLE IF EXISTS rqg_t3"
+    sql "DROP TABLE IF EXISTS rqg_t4"
+
+    sql """
+        CREATE TABLE rqg_t1 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_int_undef_signed_not_null INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        CREATE TABLE rqg_t2 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL,
+            col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar + 
bigint columns
+    sql """
+        CREATE TABLE rqg_t3 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed BIGINT,
+            col_varchar_10__undef_signed VARCHAR(10),
+            col_varchar_64__undef_signed VARCHAR(64)
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+    sql """
+        CREATE TABLE rqg_t4 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        INSERT INTO rqg_t3 VALUES
+            (0, -94, 'Abc', 'hello world'),
+            (1, 672609, 'Xyz', null),
+            (2, -3766684, 'Pqr', 'test string'),
+            (3, 5070261, 'abc', 'another row'),
+            (4, null, 'def', 'value four'),
+            (5, -86, 'XgpxlHBLEM', null),
+            (6, 21910, 'abc', 'they'),
+            (7, -63, 'zzzz', 'some text'),
+            (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+            (9, -101, 'mid', 'final row')
+    """
+
+    sql """
+        INSERT INTO rqg_t4 VALUES
+            (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+            (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+            (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+            (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+    """
+
+    // Insert enough rows to exercise multiple pipeline tasks
+    sql """
+        INSERT INTO rqg_t1 VALUES
+            (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3),
+            (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7),
+            (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11),
+            (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15),
+            (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19)
+    """
+
+    sql """
+        INSERT INTO rqg_t2 VALUES
+            (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5),
+            (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5),
+            (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5),
+            (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111, 
12.5),
+            (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114, 
15.5),
+            (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117, 
18.5),
+            (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5)
+    """
+
+    // Wait for data to be visible
+    Thread.sleep(5000)

Review Comment:
   Done in `19de375e12e` — replaced `Thread.sleep(5000)` with a poll on the 
actual row counts (`rqg_t1`/`rqg_t2` == 20) with a timeout, so it waits on the 
real condition instead of a fixed delay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to