924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413420330
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -690,6 +716,188 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}
+Status PipelineFragmentContext::_create_deferred_local_exchangers() {
+ for (auto& info : _deferred_exchangers) {
+ // DANGER ZONE — do not "fix" this line without reading the history.
+ //
+ // sender_count seeds Exchanger::_running_sink_operators, which the
source side
+ // waits to reach 0 via sub_running_sink_operators on each sink
LocalState close.
+ // The correct value is THIS pipeline-instance's sink task count,
which is exactly
+ // info.upstream_pipe->num_tasks() — one PipelineTask per task, one
close per task.
+ //
+ // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to
mirror the
+ // BE-planned path in _add_local_exchange_impl (~line 1023). THIS
BREAKS the
+ // common FE-planned shape of `serial scan → LE(PT) → ...`:
upstream_pipe
+ // genuinely has num_tasks=1, only 1 close arrives, but seed becomes
+ // _num_instances so _running_sink_operators never reaches 0 —
downstream
+ // sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
+ // mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build
949402
+ // regressed exactly this way). BE-planned mode uses max() because
its
+ // `cur_pipe` is the source-side pipeline (always raised to
_num_instances by
+ // add_pipeline) — not analogous to our `upstream_pipe` here, which
is the
+ // sink-side pipeline that may legitimately stay at 1 for serial
sources.
+ //
+ // Tempting wrong fix #2: multiply by _num_instances on the theory
shared_state
+ // is shared across all instances. Same hang — each
fragment-instance
+ // PipelineFragmentContext has its OWN _op_id_to_shared_state map,
so the
+ // exchanger is per-instance, not per-BE. num_tasks() is already
the right
+ // close-count for one instance.
+ //
+ // If a hang shows up with `_running_sink_operators < 0`, the bug is
upstream:
+ // _propagate_local_exchange_num_tasks left num_tasks too low (or too
high) for
+ // this fragment shape. Fix THAT pass, not this seed value.
+ const int sender_count = info.upstream_pipe->num_tasks();
+ switch (info.partition_type) {
+ case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
+ case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
+ info.shared_state->exchanger = ShuffleExchanger::create_unique(
+ sender_count, _num_instances, info.num_partitions,
info.free_blocks_limit,
+ info.partition_type);
+ break;
+ case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
+ info.shared_state->exchanger =
BucketShuffleExchanger::create_unique(
+ sender_count, _num_instances, info.num_partitions,
info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::PASSTHROUGH:
+ info.shared_state->exchanger = PassthroughExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::BROADCAST:
+ info.shared_state->exchanger = BroadcastExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::PASS_TO_ONE:
+ if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+ info.shared_state->exchanger =
PassToOneExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ } else {
+ info.shared_state->exchanger =
BroadcastExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ }
+ break;
+ case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
+ info.shared_state->exchanger =
AdaptivePassthroughExchanger::create_unique(
+ sender_count, _num_instances, info.free_blocks_limit);
+ break;
+ case TLocalPartitionType::NOOP:
+ case TLocalPartitionType::LOCAL_MERGE_SORT:
Review Comment:
Not a regression. `LOCAL_MERGE_SORT` as a `TLocalPartitionType` is only
emitted by the legacy BE-side local-exchange planner. The local-merge-sort
capability itself is driven by `SortNode.use_local_merge` →
`LocalMergeSortSourceOperatorX` (`pipeline_fragment_context.cpp:1858`), which
FE sets and works the same in both planner modes. The FE-planned local-exchange
path simply never needs to emit this partition type, so the factory treats it
(and NOOP) as a protocol violation. If you have a sort plan where FE-planned
mode would actually need a `LOCAL_MERGE_SORT` *local exchange*, point me at it
and I'll dig in.
##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java:
##########
@@ -277,7 +277,8 @@ private static Multiset<DistributedPlanWorker>
computeInstanceNumPerWorker(
return workerCounter;
}
- private static Map<Integer, Integer>
computeExchangeSenderNum(PipelineDistributedPlan distributedPlan) {
+ private static Map<Integer, Integer> computeExchangeSenderNum(
+ PipelineDistributedPlan distributedPlan) {
Review Comment:
This is just an incidental line-length reformat of the method signature (no
behavior change). Happy to revert it to keep the diff minimal if you'd prefer —
it crept in from formatting.
##########
be/src/exec/operator/partitioned_hash_join_probe_operator.h:
##########
@@ -228,16 +228,8 @@ class PartitionedHashJoinProbeOperatorX final
Status pull(doris::RuntimeState* state, Block* output_block, bool* eos)
const override;
bool need_more_input_data(RuntimeState* state) const override;
- DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
- if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- return {ExchangeType::NOOP};
- }
- return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
- _join_distribution ==
TJoinDistributionType::COLOCATE
- ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
- _distribution_partition_exprs)
- : DataDistribution(ExchangeType::HASH_SHUFFLE,
- _distribution_partition_exprs));
+ DataDistribution required_data_distribution(RuntimeState* state) const
override {
+ return _inner_probe_operator->required_data_distribution(state);
Review Comment:
Not a perf optimization — it's a consistency refactor.
`PartitionedHashJoinProbeOperatorX` now delegates
`required_data_distribution()` to its inner `_inner_probe_operator` instead of
duplicating the join-distribution logic, so the spill path reports the same
required distribution as the non-spill `HashJoinProbeOperatorX` (single source
of truth). It also drops the now-unused `_join_distribution` member.
##########
be/src/udf/python/boost_process_compat.h:
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
Review Comment:
已移除(commit `1d48e9ae984`):`boost_process_compat.h` 以及 `build.sh` /
`python_server.cpp` / `python_udf_runtime.{h,cpp}` 里跟 boost/process 相关的改动都还原到
master 了,这些 mac 编译相关的改动不进这个 PR。
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -290,6 +290,32 @@ Status
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(),
*_query_ctx->desc_tbl,
&_root_op, root_pipeline));
+ // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor
pipelines
+ // that inherited reduced num_tasks from a serial operator.
+ _propagate_local_exchange_num_tasks();
+
+ // Create deferred local exchangers now that all pipelines have final
num_tasks.
+ RETURN_IF_ERROR(_create_deferred_local_exchangers());
Review Comment:
是的,这两块不是为了灰度/回滚——是 FE 接管 local exchange 的**当前折中**,不是一步到位。
要让 FE 把"每个算子的并发"也规划好直接下发,本质上需要 FE 自己切 pipeline、自己控制每条 pipeline 的并发(num_tasks
是 pipeline 粒度的,只在 pipeline 边界即 local exchange / pipeline breaker 处变化),BE 就不再规划
pipeline 了。那等于把 BE 的整套 pipeline 构建(`_build_pipeline`、pipeline-breaker
判定、num_tasks 推导、dependency 接线,还要兼顾 spill / runtime filter / shared state)搬到
FE,并改 thrift 契约携带完整 pipeline 结构——战线太长、风险太大。这套 FE 近似等价实现已经做了大半年、近 1 万行代码,没法一把全做。
所以现状是:FE 规划 LE 的**位置/类型/分布**(决定 plan 正确性,过去正是 BE/FE 分歧 bug
的根源),`enable_local_shuffle_planner=true` 时 BE 的 `_plan_local_exchange` 整个跳过;但每条
pipeline 的 num_tasks 仍由 BE 的 `_propagate_local_exchange_num_tasks`
拓扑推导,`_create_deferred_local_exchangers` 因为 exchanger 的 `num_partitions` 要等
num_tasks 定了才能建、所以延后创建。终态是 FE 连 num_tasks 也算好下发、BE 的 propagate 退化成
assert(只校验)——那是后续单独的大改。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]