morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3411912827
##########
be/src/exec/operator/partition_sort_sink_operator.h:
##########
@@ -95,9 +95,10 @@ class PartitionSortSinkOperatorX final : public
DataSinkOperatorX<PartitionSortS
Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
- return DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribute_exprs);
+ return
DataDistribution(TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE,
Review Comment:
why all HASH_SHUFFLE update to `GLOBAL_EXECUTION_HASH_SHUFFLE`? When will
`LOCAL_EXECUTION_HASH_SHUFFLE` be generated?
##########
be/src/exec/pipeline/pipeline.h:
##########
@@ -68,16 +68,15 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
[[nodiscard]] PipelineId id() const { return _pipeline_id; }
- static bool is_hash_exchange(ExchangeType idx) {
- return idx == ExchangeType::HASH_SHUFFLE || idx ==
ExchangeType::BUCKET_HASH_SHUFFLE;
+ static bool is_hash_exchange(TLocalPartitionType::type idx) {
+ return is_shuffled_exchange(idx);
}
// For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
Review Comment:
need update this comment
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -810,52 +1018,59 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
sink_id, local_exchange_id, use_global_hash_shuffle ?
_total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx);
if (bucket_seq_to_instance_idx.empty() &&
- data_distribution.distribution_type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
- data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
+ data_distribution.distribution_type ==
TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
+ data_distribution.distribution_type =
+ use_global_hash_shuffle ?
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
+ :
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
+ }
+ if (!use_global_hash_shuffle &&
+ data_distribution.distribution_type ==
TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
+ data_distribution.distribution_type =
TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
Review Comment:
so the operator's GLOBAL_EXECUTION_HASH_SHUFFLE will be updated according to
`use_global_hash_shuffle` here?
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+ for (auto& info : _deferred_exchangers) {
+ // DANGER ZONE — do not "fix" this line without reading the history.
+ //
+ // sender_count seeds Exchanger::_running_sink_operators, which the
source side
+ // waits to reach 0 via sub_running_sink_operators on each sink
LocalState close.
+ // The correct value is THIS pipeline-instance's sink task count,
which is exactly
+ // info.upstream_pipe->num_tasks() — one PipelineTask per task, one
close per task.
+ //
+ // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to
mirror the
+ // BE-planned path in _add_local_exchange_impl (~line 1023). THIS
BREAKS the
+ // common FE-planned shape of `serial scan → LE(PT) → ...`:
upstream_pipe
+ // genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+ // _num_instances so _running_sink_operators never reaches 0 —
downstream
+ // sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+ // mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build
949402
+ // regressed exactly this way). BE-planned mode uses max() because
its
+ // `cur_pipe` is the source-side pipeline (always raised to
_num_instances by
+ // add_pipeline) — not analogous to our `upstream_pipe` here, which
is the
+ // sink-side pipeline that may legitimately stay at 1 for serial
sources.
+ //
+ // Tempting wrong fix #2: multiply by _num_instances on the theory
shared_state
+ // is shared across all instances. Same hang — each
fragment-instance
+ // PipelineFragmentContext has its OWN _op_id_to_shared_state map,
so the
+ // exchanger is per-instance, not per-BE. num_tasks() is already
the right
+ // close-count for one instance.
+ //
+ // If a hang shows up with `_running_sink_operators < 0`, the bug is
upstream:
+ // _propagate_local_exchange_num_tasks left num_tasks too low (or too
high) for
+ // this fragment shape. Fix THAT pass, not this seed value.
+ const int sender_count = info.upstream_pipe->num_tasks();
+ switch (info.partition_type) {
+ case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+ case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+ info.shared_state->exchanger = ShuffleExchanger::create_unique(
+ sender_count, _num_instances, info.num_partitions,
info.free_blocks_limit,
+ info.partition_type);
+ break;
+ case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
+ info.shared_state->exchanger =
BucketShuffleExchanger::create_unique(
+ sender_count, _num_instances, info.num_partitions,
info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::PASSTHROUGH:
+ info.shared_state->exchanger = PassthroughExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::BROADCAST:
+ info.shared_state->exchanger = BroadcastExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::PASS_TO_ONE:
+ if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+ info.shared_state->exchanger =
PassToOneExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ } else {
+ info.shared_state->exchanger =
BroadcastExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ }
+ break;
+ case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
+ info.shared_state->exchanger =
AdaptivePassthroughExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::NOOP:
+ case TLocalPartitionType::LOCAL_MERGE_SORT:
Review Comment:
so, this is a regression for plan_by_fe? what is the reason?
##########
be/src/exec/pipeline/dependency.h:
##########
@@ -832,50 +832,44 @@ struct SetSharedState : public BasicSharedState {
Status hash_table_init();
};
-enum class ExchangeType : uint8_t {
- NOOP = 0,
- // Shuffle data by Crc32CHashPartitioner
- HASH_SHUFFLE = 1,
- // Round-robin passthrough data blocks.
- PASSTHROUGH = 2,
- // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as
storage engine).
- BUCKET_HASH_SHUFFLE = 3,
- // Passthrough data blocks to all channels.
- BROADCAST = 4,
- // Passthrough data to channels evenly in an adaptive way.
- ADAPTIVE_PASSTHROUGH = 5,
- // Send all data to the first channel.
- PASS_TO_ONE = 6,
-};
+inline bool is_shuffled_exchange(TLocalPartitionType::type idx) {
+ return idx == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
+ idx == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
+ idx == TLocalPartitionType::BUCKET_HASH_SHUFFLE;
+}
Review Comment:
same function in `operator.cpp`?
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -723,7 +931,7 @@ Status PipelineFragmentContext::_create_tree_helper(
*root = op;
}
/**
- * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed
by a shuffled operator (shuffled hash join, union operator followed by
co-located operators).
+ * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is
followed by a shuffled operator (shuffled hash join, union operator followed by
co-located operators).
Review Comment:
HASH_SHUFFLE no longer existed, update the comment
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+ for (auto& info : _deferred_exchangers) {
+ // DANGER ZONE — do not "fix" this line without reading the history.
+ //
+ // sender_count seeds Exchanger::_running_sink_operators, which the
source side
+ // waits to reach 0 via sub_running_sink_operators on each sink
LocalState close.
+ // The correct value is THIS pipeline-instance's sink task count,
which is exactly
+ // info.upstream_pipe->num_tasks() — one PipelineTask per task, one
close per task.
+ //
+ // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to
mirror the
+ // BE-planned path in _add_local_exchange_impl (~line 1023). THIS
BREAKS the
+ // common FE-planned shape of `serial scan → LE(PT) → ...`:
upstream_pipe
+ // genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+ // _num_instances so _running_sink_operators never reaches 0 —
downstream
+ // sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+ // mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build
949402
Review Comment:
the comment contains meaningless build id 949402
##########
be/src/exec/operator/partitioned_hash_join_probe_operator.h:
##########
@@ -228,16 +228,8 @@ class PartitionedHashJoinProbeOperatorX final
Status pull(doris::RuntimeState* state, Block* output_block, bool* eos)
const override;
bool need_more_input_data(RuntimeState* state) const override;
- DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
- if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- return {ExchangeType::NOOP};
- }
- return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
- _join_distribution ==
TJoinDistributionType::COLOCATE
- ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
- _distribution_partition_exprs)
- : DataDistribution(ExchangeType::HASH_SHUFFLE,
- _distribution_partition_exprs));
+ DataDistribution required_data_distribution(RuntimeState* state) const
override {
+ return _inner_probe_operator->required_data_distribution(state);
Review Comment:
this is a optimization?
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -1796,6 +2011,89 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
break;
}
+ case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
+ op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode,
next_operator_id(), descs);
+ // The downstream pipeline (containing LocalExchangeSource) must have
+ // _num_instances tasks — matching BE-native
_inherit_pipeline_properties
+ // which sets pipe_with_source.set_num_tasks(_num_instances).
+ // Without this, when the parent pipeline was reduced by a serial
operator
+ // (e.g., serial Exchange with use_serial_exchange=true, or
UNPARTITIONED
+ // Exchange), the downstream inherits the reduced num_tasks via
+ // add_pipeline(parent). The deferred exchanger creates _num_instances
+ // channels but only fewer source tasks initialize mem_counters — the
+ // sink round-robins to all channels and crashes on uninitialized ones.
+ auto downstream_num_tasks = _num_instances;
Review Comment:
`downstream_num_tasks` is an unnecessary local variable.
##########
regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy:
##########
@@ -0,0 +1,1560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression tests for bugs discovered by RQG testing on the local-exchange2
branch.
+ *
+ * These queries triggered "must set shared state" errors or incorrect results
+ * in RQG build 183992. Common conditions:
+ * - use_serial_exchange=true (makes ALL Exchanges serial, not just
UNPARTITIONED)
+ * - enable_local_shuffle_planner=true (FE-planned local exchange)
+ * - parallel_pipeline_task_num > 1
+ *
+ * Error types reproduced:
+ * 1. must set shared state, in AGGREGATION_OPERATOR
+ * 2. must set shared state, in SORT_OPERATOR
+ * 3. incorrect results with GROUPING SETS + scalar subquery + window
function
+ */
+suite("test_local_shuffle_rqg_bugs") {
+
+ // ============================================================
+ // Table setup — mirrors RQG table structure
+ // 10 buckets to match RQG (replication_num=1 for single-BE testing)
+ // ============================================================
+ sql "DROP TABLE IF EXISTS rqg_t1"
+ sql "DROP TABLE IF EXISTS rqg_t2"
+ sql "DROP TABLE IF EXISTS rqg_t3"
+ sql "DROP TABLE IF EXISTS rqg_t4"
+
+ sql """
+ CREATE TABLE rqg_t1 (
+ pk INT NOT NULL,
+ col_int_undef_signed INT,
+ col_int_undef_signed2 INT,
+ col_int_undef_signed_not_null INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ sql """
+ CREATE TABLE rqg_t2 (
+ pk INT NOT NULL,
+ col_int_undef_signed INT,
+ col_int_undef_signed2 INT,
+ col_bigint_undef_signed_not_null BIGINT NOT NULL,
+ col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar +
bigint columns
+ sql """
+ CREATE TABLE rqg_t3 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed BIGINT,
+ col_varchar_10__undef_signed VARCHAR(10),
+ col_varchar_64__undef_signed VARCHAR(64)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+ sql """
+ CREATE TABLE rqg_t4 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed_not_null BIGINT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ sql """
+ INSERT INTO rqg_t3 VALUES
+ (0, -94, 'Abc', 'hello world'),
+ (1, 672609, 'Xyz', null),
+ (2, -3766684, 'Pqr', 'test string'),
+ (3, 5070261, 'abc', 'another row'),
+ (4, null, 'def', 'value four'),
+ (5, -86, 'XgpxlHBLEM', null),
+ (6, 21910, 'abc', 'they'),
+ (7, -63, 'zzzz', 'some text'),
+ (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+ (9, -101, 'mid', 'final row')
+ """
+
+ sql """
+ INSERT INTO rqg_t4 VALUES
+ (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+ (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+ (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+ (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+ """
+
+ // Insert enough rows to exercise multiple pipeline tasks
+ sql """
+ INSERT INTO rqg_t1 VALUES
+ (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3),
+ (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7),
+ (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11),
+ (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15),
+ (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19)
+ """
+
+ sql """
+ INSERT INTO rqg_t2 VALUES
+ (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5),
+ (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5),
+ (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5),
+ (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111,
12.5),
+ (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114,
15.5),
+ (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117,
18.5),
+ (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5)
+ """
+
+ // Wait for data to be visible
+ Thread.sleep(5000)
Review Comment:
Use a check instead of the unstable sleep
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]