924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413420330


##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+    for (auto& info : _deferred_exchangers) {
+        // DANGER ZONE — do not "fix" this line without reading the history.
+        //
+        // sender_count seeds Exchanger::_running_sink_operators, which the 
source side
+        // waits to reach 0 via sub_running_sink_operators on each sink 
LocalState close.
+        // The correct value is THIS pipeline-instance's sink task count, 
which is exactly
+        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one 
close per task.
+        //
+        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to 
mirror the
+        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS 
BREAKS the
+        //   common FE-planned shape of `serial scan → LE(PT) → ...`: 
upstream_pipe
+        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+        //   _num_instances so _running_sink_operators never reaches 0 — 
downstream
+        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 
949402
+        //   regressed exactly this way).  BE-planned mode uses max() because 
its
+        //   `cur_pipe` is the source-side pipeline (always raised to 
_num_instances by
+        //   add_pipeline) — not analogous to our `upstream_pipe` here, which 
is the
+        //   sink-side pipeline that may legitimately stay at 1 for serial 
sources.
+        //
+        // Tempting wrong fix #2: multiply by _num_instances on the theory 
shared_state
+        //   is shared across all instances.  Same hang — each 
fragment-instance
+        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, 
so the
+        //   exchanger is per-instance, not per-BE.  num_tasks() is already 
the right
+        //   close-count for one instance.
+        //
+        // If a hang shows up with `_running_sink_operators < 0`, the bug is 
upstream:
+        // _propagate_local_exchange_num_tasks left num_tasks too low (or too 
high) for
+        // this fragment shape.  Fix THAT pass, not this seed value.
+        const int sender_count = info.upstream_pipe->num_tasks();
+        switch (info.partition_type) {
+        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+            info.shared_state->exchanger = ShuffleExchanger::create_unique(
+                    sender_count, _num_instances, info.num_partitions, 
info.free_blocks_limit,
+                    info.partition_type);
+            break;
+        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
+            info.shared_state->exchanger = 
BucketShuffleExchanger::create_unique(
+                    sender_count, _num_instances, info.num_partitions, 
info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::PASSTHROUGH:
+            info.shared_state->exchanger = PassthroughExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::BROADCAST:
+            info.shared_state->exchanger = BroadcastExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::PASS_TO_ONE:
+            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+                info.shared_state->exchanger = 
PassToOneExchanger::create_unique(
+                        sender_count, _num_instances, info.free_blocks_limit);
+            } else {
+                info.shared_state->exchanger = 
BroadcastExchanger::create_unique(
+                        sender_count, _num_instances, info.free_blocks_limit);
+            }
+            break;
+        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
+            info.shared_state->exchanger = 
AdaptivePassthroughExchanger::create_unique(
+                    sender_count, _num_instances, info.free_blocks_limit);
+            break;
+        case TLocalPartitionType::NOOP:
+        case TLocalPartitionType::LOCAL_MERGE_SORT:

Review Comment:
   Not a regression. `LOCAL_MERGE_SORT` as a `TLocalPartitionType` is only 
emitted by the legacy BE-side local-exchange planner. The local-merge-sort 
capability itself is driven by `SortNode.use_local_merge` → 
`LocalMergeSortSourceOperatorX` (`pipeline_fragment_context.cpp:1858`), which 
FE sets and works the same in both planner modes. The FE-planned local-exchange 
path simply never needs to emit this partition type, so the factory treats it 
(and NOOP) as a protocol violation. If you have a sort plan where FE-planned 
mode would actually need a `LOCAL_MERGE_SORT` *local exchange*, point me at it 
and I'll dig in.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java:
##########
@@ -277,7 +277,8 @@ private static Multiset<DistributedPlanWorker> 
computeInstanceNumPerWorker(
         return workerCounter;
     }
 
-    private static Map<Integer, Integer> 
computeExchangeSenderNum(PipelineDistributedPlan distributedPlan) {
+    private static Map<Integer, Integer> computeExchangeSenderNum(
+            PipelineDistributedPlan distributedPlan) {

Review Comment:
   This is just an incidental line-length reformat of the method signature (no 
behavior change). Happy to revert it to keep the diff minimal if you'd prefer — 
it crept in from formatting.



##########
be/src/exec/operator/partitioned_hash_join_probe_operator.h:
##########
@@ -228,16 +228,8 @@ class PartitionedHashJoinProbeOperatorX final
     Status pull(doris::RuntimeState* state, Block* output_block, bool* eos) 
const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
-    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-            return {ExchangeType::NOOP};
-        }
-        return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
-                                _join_distribution == 
TJoinDistributionType::COLOCATE
-                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
-                                           _distribution_partition_exprs)
-                        : DataDistribution(ExchangeType::HASH_SHUFFLE,
-                                           _distribution_partition_exprs));
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        return _inner_probe_operator->required_data_distribution(state);

Review Comment:
   Not a perf optimization — it's a consistency refactor. 
`PartitionedHashJoinProbeOperatorX` now delegates 
`required_data_distribution()` to its inner `_inner_probe_operator` instead of 
duplicating the join-distribution logic, so the spill path reports the same 
required distribution as the non-spill `HashJoinProbeOperatorX` (single source 
of truth). It also drops the now-unused `_join_distribution` member.



##########
be/src/udf/python/boost_process_compat.h:
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once

Review Comment:
   已移除(commit `1d48e9ae984`):`boost_process_compat.h` 以及 `build.sh` / 
`python_server.cpp` / `python_udf_runtime.{h,cpp}` 里跟 boost/process 相关的改动都还原到 
master 了,这些 mac 编译相关的改动不进这个 PR。



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -290,6 +290,32 @@ Status 
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
         RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
                                          &_root_op, root_pipeline));
 
+        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor 
pipelines
+        // that inherited reduced num_tasks from a serial operator.
+        _propagate_local_exchange_num_tasks();
+
+        // Create deferred local exchangers now that all pipelines have final 
num_tasks.
+        RETURN_IF_ERROR(_create_deferred_local_exchangers());

Review Comment:
   是的,这两块不是为了灰度/回滚——是 FE 接管 local exchange 的**当前折中**,不是一步到位。
   
   要让 FE 把"每个算子的并发"也规划好直接下发,本质上需要 FE 自己切 pipeline、自己控制每条 pipeline 的并发(num_tasks 
是 pipeline 粒度的,只在 pipeline 边界即 local exchange / pipeline breaker 处变化),BE 就不再规划 
pipeline 了。那等于把 BE 的整套 pipeline 构建(`_build_pipeline`、pipeline-breaker 
判定、num_tasks 推导、dependency 接线,还要兼顾 spill / runtime filter / shared state)搬到 
FE,并改 thrift 契约携带完整 pipeline 结构——战线太长、风险太大。这套 FE 近似等价实现已经做了大半年、近 1 万行代码,没法一把全做。
   
   所以现状是:FE 规划 LE 的**位置/类型/分布**(决定 plan 正确性,过去正是 BE/FE 分歧 bug 
的根源),`enable_local_shuffle_planner=true` 时 BE 的 `_plan_local_exchange` 整个跳过;但每条 
pipeline 的 num_tasks 仍由 BE 的 `_propagate_local_exchange_num_tasks` 
拓扑推导,`_create_deferred_local_exchangers` 因为 exchanger 的 `num_partitions` 要等 
num_tasks 定了才能建、所以延后创建。终态是 FE 连 num_tasks 也算好下发、BE 的 propagate 退化成 
assert(只校验)——那是后续单独的大改。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to