morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3411912827


##########
be/src/exec/operator/partition_sort_sink_operator.h:
##########
@@ -95,9 +95,10 @@ class PartitionSortSinkOperatorX final : public 
DataSinkOperatorX<PartitionSortS
     Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+            return 
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,

Review Comment:
   why all HASH_SHUFFLE update to `GLOBAL_EXECUTION_HASH_SHUFFLE`? When will 
`LOCAL_EXECUTION_HASH_SHUFFLE` be generated?



##########
be/src/exec/pipeline/pipeline.h:
##########
@@ -68,16 +68,15 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
 
     [[nodiscard]] PipelineId id() const { return _pipeline_id; }
 
-    static bool is_hash_exchange(ExchangeType idx) {
-        return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
+    static bool is_hash_exchange(TLocalPartitionType::type idx) {
+        return is_shuffled_exchange(idx);
     }
 
     // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,

Review Comment:
   need update this comment



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -810,52 +1018,59 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
             sink_id, local_exchange_id, use_global_hash_shuffle ? 
_total_instances : _num_instances,
             data_distribution.partition_exprs, bucket_seq_to_instance_idx);
     if (bucket_seq_to_instance_idx.empty() &&
-        data_distribution.distribution_type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
-        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
+        data_distribution.distribution_type == 
TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
+        data_distribution.distribution_type =
+                use_global_hash_shuffle ? 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
+                                        : 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
+    }
+    if (!use_global_hash_shuffle &&
+        data_distribution.distribution_type == 
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
+        data_distribution.distribution_type = 
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;

Review Comment:
   so the operator's GLOBAL_EXECUTION_HASH_SHUFFLE will be updated according to 
`use_global_hash_shuffle` here?



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+    for (auto& info : _deferred_exchangers) {
+        // DANGER ZONE — do not "fix" this line without reading the history.
+        //
+        // sender_count seeds Exchanger::_running_sink_operators, which the 
source side
+        // waits to reach 0 via sub_running_sink_operators on each sink 
LocalState close.
+        // The correct value is THIS pipeline-instance's sink task count, 
which is exactly
+        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one 
close per task.
+        //
+        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to 
mirror the
+        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS 
BREAKS the
+        //   common FE-planned shape of `serial scan → LE(PT) → ...`: 
upstream_pipe
+        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+        //   _num_instances so _running_sink_operators never reaches 0 — 
downstream
+        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 
949402
+        //   regressed exactly this way).  BE-planned mode uses max() because 
its
+        //   `cur_pipe` is the source-side pipeline (always raised to 
_num_instances by
+        //   add_pipeline) — not analogous to our `upstream_pipe` here, which 
is the
+        //   sink-side pipeline that may legitimately stay at 1 for serial 
sources.
+        //
+        // Tempting wrong fix #2: multiply by _num_instances on the theory 
shared_state
+        //   is shared across all instances.  Same hang — each 
fragment-instance
+        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, 
so the
+        //   exchanger is per-instance, not per-BE.  num_tasks() is already 
the right
+        //   close-count for one instance.
+        //
+        // If a hang shows up with `_running_sink_operators < 0`, the bug is 
upstream:
+        // _propagate_local_exchange_num_tasks left num_tasks too low (or too 
high) for
+        // this fragment shape.  Fix THAT pass, not this seed value.
+        const int sender_count = info.upstream_pipe->num_tasks();
+        switch (info.partition_type) {
+        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+            info.shared_state->exchanger = ShuffleExchanger::create_unique(
+                    sender_count, _num_instances, info.num_partitions, 
info.free_blocks_limit,
+                    info.partition_type);
+            break;
+        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
+            info.shared_state->exchanger = 
BucketShuffleExchanger::create_unique(
+                    sender_count, _num_instances, info.num_partitions, 
info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::PASSTHROUGH:
+            info.shared_state->exchanger = PassthroughExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::BROADCAST:
+            info.shared_state->exchanger = BroadcastExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::PASS_TO_ONE:
+            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+                info.shared_state->exchanger = 
PassToOneExchanger::create_unique(
+                        sender_count, _num_instances, info.free_blocks_limit);
+            } else {
+                info.shared_state->exchanger = 
BroadcastExchanger::create_unique(
+                        sender_count, _num_instances, info.free_blocks_limit);
+            }
+            break;
+        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
+            info.shared_state->exchanger = 
AdaptivePassthroughExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::NOOP:
+        case TLocalPartitionType::LOCAL_MERGE_SORT:

Review Comment:
   so, this is a regression for plan_by_fe? what is the reason?



##########
be/src/exec/pipeline/dependency.h:
##########
@@ -832,50 +832,44 @@ struct SetSharedState : public BasicSharedState {
     Status hash_table_init();
 };
 
-enum class ExchangeType : uint8_t {
-    NOOP = 0,
-    // Shuffle data by Crc32CHashPartitioner
-    HASH_SHUFFLE = 1,
-    // Round-robin passthrough data blocks.
-    PASSTHROUGH = 2,
-    // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as 
storage engine).
-    BUCKET_HASH_SHUFFLE = 3,
-    // Passthrough data blocks to all channels.
-    BROADCAST = 4,
-    // Passthrough data to channels evenly in an adaptive way.
-    ADAPTIVE_PASSTHROUGH = 5,
-    // Send all data to the first channel.
-    PASS_TO_ONE = 6,
-};
+inline bool is_shuffled_exchange(TLocalPartitionType::type idx) {
+    return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
+           idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE;
+}

Review Comment:
   same function in `operator.cpp`?



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -723,7 +931,7 @@ Status PipelineFragmentContext::_create_tree_helper(
         *root = op;
     }
     /**
-     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed 
by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).
+     * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is 
followed by a shuffled operator (shuffled hash join, union operator followed by 
co-located operators).

Review Comment:
   HASH_SHUFFLE no longer existed, update the comment



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+    for (auto& info : _deferred_exchangers) {
+        // DANGER ZONE — do not "fix" this line without reading the history.
+        //
+        // sender_count seeds Exchanger::_running_sink_operators, which the 
source side
+        // waits to reach 0 via sub_running_sink_operators on each sink 
LocalState close.
+        // The correct value is THIS pipeline-instance's sink task count, 
which is exactly
+        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one 
close per task.
+        //
+        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to 
mirror the
+        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS 
BREAKS the
+        //   common FE-planned shape of `serial scan → LE(PT) → ...`: 
upstream_pipe
+        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+        //   _num_instances so _running_sink_operators never reaches 0 — 
downstream
+        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 
949402

Review Comment:
   the comment contains meaningless build id 949402



##########
be/src/exec/operator/partitioned_hash_join_probe_operator.h:
##########
@@ -228,16 +228,8 @@ class PartitionedHashJoinProbeOperatorX final
     Status pull(doris::RuntimeState* state, Block* output_block, bool* eos) 
const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
-    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
-        }
-        return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
-                                _join_distribution == 
TJoinDistributionType::COLOCATE
-                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
-                                           _distribution_partition_exprs)
-                        : DataDistribution(ExchangeType::HASH_SHUFFLE,
-                                           _distribution_partition_exprs));
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        return _inner_probe_operator->required_data_distribution(state);

Review Comment:
   this is a optimization?



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1796,6 +2011,89 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         break;
     }
+    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
+        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
+        // The downstream pipeline (containing LocalExchangeSource) must have
+        // _num_instances tasks — matching BE-native 
_inherit_pipeline_properties
+        // which sets pipe_with_source.set_num_tasks(_num_instances).
+        // Without this, when the parent pipeline was reduced by a serial 
operator
+        // (e.g., serial Exchange with use_serial_exchange=true, or 
UNPARTITIONED
+        // Exchange), the downstream inherits the reduced num_tasks via
+        // add_pipeline(parent).  The deferred exchanger creates _num_instances
+        // channels but only fewer source tasks initialize mem_counters — the
+        // sink round-robins to all channels and crashes on uninitialized ones.
+        auto downstream_num_tasks = _num_instances;

Review Comment:
   `downstream_num_tasks` is an unnecessary local variable.



##########
regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy:
##########
@@ -0,0 +1,1560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression tests for bugs discovered by RQG testing on the local-exchange2 
branch.
+ *
+ * These queries triggered "must set shared state" errors or incorrect results
+ * in RQG build 183992.  Common conditions:
+ *   - use_serial_exchange=true  (makes ALL Exchanges serial, not just 
UNPARTITIONED)
+ *   - enable_local_shuffle_planner=true (FE-planned local exchange)
+ *   - parallel_pipeline_task_num > 1
+ *
+ * Error types reproduced:
+ *   1. must set shared state, in AGGREGATION_OPERATOR
+ *   2. must set shared state, in SORT_OPERATOR
+ *   3. incorrect results with GROUPING SETS + scalar subquery + window 
function
+ */
+suite("test_local_shuffle_rqg_bugs") {
+
+    // ============================================================
+    //  Table setup — mirrors RQG table structure
+    //  10 buckets to match RQG (replication_num=1 for single-BE testing)
+    // ============================================================
+    sql "DROP TABLE IF EXISTS rqg_t1"
+    sql "DROP TABLE IF EXISTS rqg_t2"
+    sql "DROP TABLE IF EXISTS rqg_t3"
+    sql "DROP TABLE IF EXISTS rqg_t4"
+
+    sql """
+        CREATE TABLE rqg_t1 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_int_undef_signed_not_null INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        CREATE TABLE rqg_t2 (
+            pk INT NOT NULL,
+            col_int_undef_signed INT,
+            col_int_undef_signed2 INT,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL,
+            col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar + 
bigint columns
+    sql """
+        CREATE TABLE rqg_t3 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed BIGINT,
+            col_varchar_10__undef_signed VARCHAR(10),
+            col_varchar_64__undef_signed VARCHAR(64)
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+    sql """
+        CREATE TABLE rqg_t4 (
+            pk INT NOT NULL,
+            col_bigint_undef_signed_not_null BIGINT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(pk)
+        DISTRIBUTED BY HASH(pk) BUCKETS 10
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        INSERT INTO rqg_t3 VALUES
+            (0, -94, 'Abc', 'hello world'),
+            (1, 672609, 'Xyz', null),
+            (2, -3766684, 'Pqr', 'test string'),
+            (3, 5070261, 'abc', 'another row'),
+            (4, null, 'def', 'value four'),
+            (5, -86, 'XgpxlHBLEM', null),
+            (6, 21910, 'abc', 'they'),
+            (7, -63, 'zzzz', 'some text'),
+            (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+            (9, -101, 'mid', 'final row')
+    """
+
+    sql """
+        INSERT INTO rqg_t4 VALUES
+            (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+            (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+            (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+            (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+    """
+
+    // Insert enough rows to exercise multiple pipeline tasks
+    sql """
+        INSERT INTO rqg_t1 VALUES
+            (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3),
+            (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7),
+            (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11),
+            (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15),
+            (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19)
+    """
+
+    sql """
+        INSERT INTO rqg_t2 VALUES
+            (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5),
+            (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5),
+            (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5),
+            (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111, 
12.5),
+            (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114, 
15.5),
+            (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117, 
18.5),
+            (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5)
+    """
+
+    // Wait for data to be visible
+    Thread.sleep(5000)

Review Comment:
   Use a check instead of the unstable sleep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to