924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3411859421
##########
be/src/exec/exchange/local_exchange_sink_operator.cpp:
##########
@@ -37,24 +37,30 @@ std::vector<Dependency*>
LocalExchangeSinkLocalState::dependencies() const {
return deps;
}
-Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
- const int num_buckets, const bool
use_global_hash_shuffle,
+Status LocalExchangeSinkOperatorX::init(RuntimeState* state,
TLocalPartitionType::type type,
+ const int num_buckets,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
+ DCHECK(!_planned_by_fe);
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) +
")";
_type = type;
- if (_type == ExchangeType::HASH_SHUFFLE) {
- _shuffle_idx_to_instance_idx.clear();
- _use_global_shuffle = use_global_hash_shuffle;
+ if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (use_global_hash_shuffle) {
- _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+ _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+ if (state->query_options().__isset.enable_new_shuffle_hash_method &&
Review Comment:
已抽成公共的 `_create_partitioner(state, bucket_count)`,现在 BE-native 路径的
`init()`(`local_exchange_sink_operator.cpp:77`) 和 FE-planned 路径的
`init_partitioner()`(:84) 都复用它,去掉了三段重复的 hash-partitioner 创建代码。
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -290,6 +290,32 @@ Status
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(),
*_query_ctx->desc_tbl,
&_root_op, root_pipeline));
+ // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor
pipelines
+ // that inherited reduced num_tasks from a serial operator.
+ _propagate_local_exchange_num_tasks();
+
+ // Create deferred local exchangers now that all pipelines have final
num_tasks.
+ RETURN_IF_ERROR(_create_deferred_local_exchangers());
Review Comment:
是的,方向一致。`enable_local_shuffle_planner=true` 时 BE 的 `_plan_local_exchange` 被
`RuntimeState::plan_local_shuffle()` 整个跳过,BE 完全按 FE 规划好的 `LOCAL_EXCHANGE_NODE`
来建 pipeline,不再自己算 local exchange。
目前仍保留 BE 的 `required_data_distribution()` / 并行度计算,只是给
`enable_local_shuffle_planner=false` 的 BE-native 回退路径用(保证和老行为逐字节一致,方便灰度 /
回滚)。FE-planned 路径不读它。
长期我也认同应该让 BE 这边退化成「只校验不计算」——FE 既然已经插好了正确的 LE,BE 的 propagate 理想情况下应该是一个
assert("FE 给的分布已满足要求"),而不是再算一遍。这个 PR 先保持两条路径并存、默认走 FE。
##########
fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java:
##########
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
+import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+
+/**
+ * FE-side local exchange planner — inserts {@link LocalExchangeNode} into
each fragment's
+ * plan tree so that within-fragment data redistribution is decided at
planning time
+ * instead of at BE pipeline-build time.
+ *
+ * <h3>When this runs</h3>
+ * Invoked from {@code NereidsPlanner.addLocalExchangeAfterDistribute()} right
after
+ * {@code DistributePlanner} has assigned instances to fragments and before
the plan is
+ * serialized to BE. Gated by session variable {@code
enable_local_shuffle_planner}
+ * (default true) and {@code enable_local_shuffle}; when either is off this
pass is
+ * skipped entirely and BE falls back to its own {@code _plan_local_exchange}.
The two
+ * paths are mutually exclusive: BE consults {@code
runtime_state.h::plan_local_shuffle()}
+ * to know whether it should plan LE itself.
+ *
+ * <h3>What it changes</h3>
+ * <ul>
+ * <li>For each fragment with {@code maxPerBeInstances > 1}, walks the plan
tree
+ * bottom-up via {@link PlanNode#enforceAndDeriveLocalExchange} and
inserts
+ * LocalExchangeNodes where children's output distribution doesn't
satisfy the
+ * parent's requirement.</li>
+ * <li>May wrap the fragment root with an extra PASSTHROUGH LE so the data
sink
+ * (DataStreamSink / OlapTableSink) runs with the full instance count
even when
+ * the root operator is serial — see {@link
#addLocalExchangeForFragment}.</li>
+ * <li>Does NOT modify the fragment sink itself, fragment boundaries, or
instance
+ * assignment.</li>
+ * </ul>
+ *
+ * <h3>Per-BE instance semantics</h3>
+ * Skips fragments where every BE has at most 1 instance. Using a global
instance count
+ * would insert LE for "2 BEs × 1 instance" cases, which BE's own
+ * {@code _plan_local_exchange} would not — leading to pipeline task-count
mismatch and
+ * deadlock. See {@link #addLocalExchange}.
+ *
+ * <h3>Reading order</h3>
+ * Start with {@link PlanNode#enforceRequire} (the recursion engine), then
individual
+ * {@code enforceAndDeriveLocalExchange} overrides on PlanNode subclasses.
+ */
+public class AddLocalExchange {
+ /** addLocalExchange with distributed plans, skipping single-instance
fragments.
+ * BE's _plan_local_exchange checks _num_instances which is the per-BE
instance count.
+ * With _num_instances<=1 all pipelines on that BE have 1 task so local
exchange is a no-op.
+ * We must use the same per-BE semantics: skip when every BE has at most
1 instance.
+ * Using global instanceCount would insert LE for fragments where 2 BEs
each have 1 instance
+ * (global=2, per-BE=1), causing pipeline task mismatch and deadlock. */
+ public void addLocalExchange(FragmentIdMapping<DistributedPlan>
distributedPlans,
+ PlanTranslatorContext context) {
+ for (DistributedPlan plan : distributedPlans.values()) {
+ PipelineDistributedPlan pipePlan = (PipelineDistributedPlan) plan;
+ long maxPerBeInstances = pipePlan.getInstanceJobs().stream()
+ .collect(java.util.stream.Collectors.groupingBy(
+ j -> j.getAssignedWorker().id(),
java.util.stream.Collectors.counting()))
+
.values().stream().mapToLong(Long::longValue).max().orElse(0);
+ if (maxPerBeInstances <= 1) {
+ continue;
+ }
+ PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
+ addLocalExchangeForFragment(fragment, context);
+ }
+ }
+
+ private void addLocalExchangeForFragment(PlanFragment fragment,
PlanTranslatorContext context) {
+ DataSink sink = fragment.getSink();
+ LocalExchangeTypeRequire require = sink == null
+ ? LocalExchangeTypeRequire.noRequire() :
sink.getLocalExchangeTypeRequire();
+ PlanNode root = fragment.getPlanRoot();
+ context.setHasSerialAncestorInPipeline(root, false);
+ Pair<PlanNode, LocalExchangeType> output = root
+ .enforceAndDeriveLocalExchange(context, null, require);
+ PlanNode newRoot = output.first;
+ // The fragment data sink (DataStreamSink, OlapTableSink) runs in the
same pipeline
+ // as the root. If the root will be serial on BE, the sink pipeline
has 1 task —
+ // only instance 0 sends data, others hang or miss writes.
+ // Insert PASSTHROUGH fan-out so sink runs with _num_instances tasks.
+ // This matches BE-native's default required_data_distribution():
+ // _child->is_serial_operator() ? PASSTHROUGH : NOOP
+ if (newRoot.isSerialOperatorOnBe(context.getConnectContext())) {
+ newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
+ LocalExchangeType.PASSTHROUGH, null);
+ }
+ if (newRoot != root) {
+ fragment.setPlanRoot(newRoot);
+ }
+ validateNoSerialWithoutLocalExchange(fragment.getPlanRoot(),
context.getConnectContext());
+ }
+
+ /**
+ * In a local-shuffle fragment, the root check above guarantees the root
pipeline
+ * has N tasks. Any serial operator reduces its pipeline to 1 task. If
this serial
+ * operator feeds into a non-serial parent without LocalExchangeNode in
between,
+ * some pipelines have 1 task while others have N → shared_state mismatch,
data loss.
+ *
+ * Serial→serial chains are fine (all at 1 task, consistent). Only the
transition
+ * from serial to non-serial needs LE to restore parallelism.
+ */
+ private void validateNoSerialWithoutLocalExchange(PlanNode node,
+ org.apache.doris.qe.ConnectContext context) {
+ for (PlanNode child : node.getChildren()) {
+ validateNoSerialWithoutLocalExchange(child, context);
+ if (child.isSerialOperatorOnBe(context)
+ && !(child instanceof LocalExchangeNode)
+ && !(node instanceof LocalExchangeNode)
+ && !(node instanceof ExchangeNode)
+ && !node.isSerialOperatorOnBe(context)) {
+
org.apache.logging.log4j.LogManager.getLogger(AddLocalExchange.class).warn(
Review Comment:
Removed this validation entirely (commit `3630071a131`) — after the CI
evidence, neither `throw` nor `warn` is worth keeping:
1. **It reads the same `isSerialOperatorOnBe` flag it would validate**, so
it is blind to the one bug class that actually matters — a *wrong* serial flag.
The framework and the check would agree on the same wrong answer; a check can't
validate an input it shares with the thing it's checking.
2. **It fires on legitimate plans.** Pushing it as a `throw` surfaced 9
distinct legitimate P0/cloud_p0 patterns where it fires but the plan is
correct: serial `UnionNode → TableFunctionNode/AggregationNode`, serial
`RecursiveCteScanNode → NLJ/Agg`, serial `OlapScanNode → HashJoinNode`. In all
of these the parent runs effectively serial at runtime and BE reconciles
`num_tasks` across the coupled pipelines, so no LE is required and results are
correct.
3. **The one class it *could* catch** — a `require`→LE logic regression — is
already caught end-to-end by the regression suite / RQG.
Correctness still comes from `enforceRequire` inserting the LEs, validated
by `test_local_shuffle_rqg_bugs` + the RQG differential against the BE-native
baseline. The principled direction is a single source of truth for the
serial/distribution decision (BE reading the FE-written flag), so there is no
FE-prediction-vs-BE-reality gap to police in the first place.
##########
regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy:
##########
@@ -0,0 +1,1557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression tests for bugs discovered by RQG testing on the local-exchange2
branch.
+ *
+ * These queries triggered "must set shared state" errors or incorrect results
+ * in RQG build 183992. Common conditions:
+ * - use_serial_exchange=true (makes ALL Exchanges serial, not just
UNPARTITIONED)
+ * - enable_local_shuffle_planner=true (FE-planned local exchange)
+ * - parallel_pipeline_task_num > 1
+ *
+ * Error types reproduced:
+ * 1. must set shared state, in AGGREGATION_OPERATOR
+ * 2. must set shared state, in SORT_OPERATOR
+ * 3. incorrect results with GROUPING SETS + scalar subquery + window
function
+ */
+suite("test_local_shuffle_rqg_bugs") {
+
+ // ============================================================
+ // Table setup — mirrors RQG table structure
+ // 10 buckets to match RQG (replication_num=1 for single-BE testing)
+ // ============================================================
+ sql "DROP TABLE IF EXISTS rqg_t1"
+ sql "DROP TABLE IF EXISTS rqg_t2"
+ sql "DROP TABLE IF EXISTS rqg_t3"
+ sql "DROP TABLE IF EXISTS rqg_t4"
+
+ sql """
+ CREATE TABLE rqg_t1 (
+ pk INT NOT NULL,
+ col_int_undef_signed INT,
+ col_int_undef_signed2 INT,
+ col_int_undef_signed_not_null INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ sql """
+ CREATE TABLE rqg_t2 (
+ pk INT NOT NULL,
+ col_int_undef_signed INT,
+ col_int_undef_signed2 INT,
+ col_bigint_undef_signed_not_null BIGINT NOT NULL,
+ col_decimal_38_10__undef_signed_not_null DECIMAL(38,10) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Table for build 184181 GLOBAL_HASH_SHUFFLE bugs — needs varchar +
bigint columns
+ sql """
+ CREATE TABLE rqg_t3 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed BIGINT,
+ col_varchar_10__undef_signed VARCHAR(10),
+ col_varchar_64__undef_signed VARCHAR(64)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Second table for FULL OUTER JOIN case (col_bigint_undef_signed_not_null)
+ sql """
+ CREATE TABLE rqg_t4 (
+ pk INT NOT NULL,
+ col_bigint_undef_signed_not_null BIGINT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(pk)
+ DISTRIBUTED BY HASH(pk) BUCKETS 10
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ sql """
+ INSERT INTO rqg_t3 VALUES
+ (0, -94, 'Abc', 'hello world'),
+ (1, 672609, 'Xyz', null),
+ (2, -3766684, 'Pqr', 'test string'),
+ (3, 5070261, 'abc', 'another row'),
+ (4, null, 'def', 'value four'),
+ (5, -86, 'XgpxlHBLEM', null),
+ (6, 21910, 'abc', 'they'),
+ (7, -63, 'zzzz', 'some text'),
+ (8, -8276281, 'AHlvNtoGLO', 'longer string here'),
+ (9, -101, 'mid', 'final row')
+ """
+
+ sql """
+ INSERT INTO rqg_t4 VALUES
+ (0, 0), (1, 1), (2, 2), (3, 3), (4, 4),
+ (5, 5), (6, 6), (7, 7), (8, 8), (9, 9),
+ (10, 2), (11, 2), (12, 2), (13, 3), (14, 4),
+ (15, 5), (16, 2), (17, 2), (18, 2), (19, 9)
+ """
+
+ // Insert enough rows to exercise multiple pipeline tasks
+ sql """
+ INSERT INTO rqg_t1 VALUES
+ (0, 0, 10, 0), (1, 1, 11, 1), (2, 2, 12, 2), (3, 3, 13, 3),
+ (4, 4, 14, 4), (5, 5, 15, 5), (6, 6, 16, 6), (7, 7, 17, 7),
+ (8, 8, 18, 8), (9, 9, 19, 9), (10, 0, 20, 10), (11, 1, 21, 11),
+ (12, 2, 22, 12), (13, 3, 23, 13), (14, 4, 24, 14), (15, 5, 25, 15),
+ (16, 6, 26, 16), (17, 7, 27, 17), (18, 8, 28, 18), (19, 9, 29, 19)
+ """
+
+ sql """
+ INSERT INTO rqg_t2 VALUES
+ (0, 0, 10, 100, 1.5), (1, 1, 11, 101, 2.5), (2, 2, 12, 102, 3.5),
+ (3, 3, 13, 103, 4.5), (4, 4, 14, 104, 5.5), (5, 5, 15, 105, 6.5),
+ (6, 6, 16, 106, 7.5), (7, 7, 17, 107, 8.5), (8, 8, 18, 108, 9.5),
+ (9, 9, 19, 109, 10.5), (10, 0, 20, 110, 11.5), (11, 1, 21, 111,
12.5),
+ (12, 2, 22, 112, 13.5), (13, 3, 23, 113, 14.5), (14, 4, 24, 114,
15.5),
+ (15, 5, 25, 115, 16.5), (16, 6, 26, 116, 17.5), (17, 7, 27, 117,
18.5),
+ (18, 8, 28, 118, 19.5), (19, 9, 29, 119, 20.5)
+ """
+
+ // Wait for data to be visible
+ Thread.sleep(5000)
+
+ // ============================================================
+ // Common settings
+ // ============================================================
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+ sql "SET runtime_filter_mode=off"
+ sql "SET enable_profile=true"
+ sql "SET enable_sql_cache=false"
+ sql "SET enable_local_shuffle=true"
+
+ // ============================================================
+ // Bug 1: must set shared state, in AGGREGATION_OPERATOR
+ // RQG case: eliminate_group_by_uniform.case_id_11007680713
+ // Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=3
+ // SQL: EXCEPT with count(*) GROUP BY on both sides
+ // ============================================================
+
+ // Test with FE planner (the buggy path)
+ logger.info("=== Bug 1a: AGG shared state - EXCEPT with serial exchange
(FE planner) ===")
+ try {
+ sql """
+ SELECT
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_common_expr_pushdown=false,
+ disable_streaming_preaggregations=true)*/
+ col_int_undef_signed_not_null as col1,
+ col_int_undef_signed_not_null as col2,
+ 0 as col3, count(1)
+ FROM rqg_t1
+ GROUP BY col1, col2, col3
+ EXCEPT
+ SELECT col_bigint_undef_signed_not_null as col1,
+ col_decimal_38_10__undef_signed_not_null as col2,
+ 5 as col3, count(1)
+ FROM rqg_t2
+ GROUP BY col1, col2, col3
+ """
+ logger.info("Bug 1a: PASSED (no crash)")
+ } catch (Throwable t) {
+ logger.error("Bug 1a FAILED: ${t.message}")
+ assertTrue(false, "Bug 1a: must set shared state in
AGGREGATION_OPERATOR: ${t.message}")
+ }
+
+ // Compare with BE native planner
+ logger.info("=== Bug 1b: AGG shared state - EXCEPT with serial exchange
(BE native) ===")
+ try {
+ sql """
+ SELECT
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=3,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_common_expr_pushdown=false,
+ disable_streaming_preaggregations=true)*/
+ col_int_undef_signed_not_null as col1,
+ col_int_undef_signed_not_null as col2,
+ 0 as col3, count(1)
+ FROM rqg_t1
+ GROUP BY col1, col2, col3
+ EXCEPT
+ SELECT col_bigint_undef_signed_not_null as col1,
+ col_decimal_38_10__undef_signed_not_null as col2,
+ 5 as col3, count(1)
+ FROM rqg_t2
+ GROUP BY col1, col2, col3
+ """
+ logger.info("Bug 1b: PASSED (no crash)")
+ } catch (Throwable t) {
+ logger.error("Bug 1b FAILED: ${t.message}")
+ assertTrue(false, "Bug 1b: BE native also fails: ${t.message}")
+ }
+
+ // ============================================================
+ // Bug 2: must set shared state, in SORT_OPERATOR
+ // RQG case: grouping_set.case_id_5308471751
+ // Key conditions: use_serial_exchange=true, parallel_pipeline_task_num=5
+ // SQL: GROUPING SETS + window function (PERCENT_RANK)
+ // ============================================================
+
+ logger.info("=== Bug 2a: SORT shared state - GROUPING SETS + window (FE
planner) ===")
+ try {
+ sql """
+ SELECT
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5,
+ enable_local_shuffle_planner=true,
+ ignore_storage_data_distribution=true,
+ enable_share_hash_table_for_broadcast_join=false,
+ disable_streaming_preaggregations=true)*/
+ SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2
ORDER BY col_int_undef_signed2))
+ FROM rqg_t1
+ GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk,
pk),(col_int_undef_signed))
+ """
+ logger.info("Bug 2a: PASSED (no crash)")
+ } catch (Throwable t) {
+ logger.error("Bug 2a FAILED: ${t.message}")
+ assertTrue(false, "Bug 2a: must set shared state in SORT_OPERATOR:
${t.message}")
+ }
+
+ logger.info("=== Bug 2b: SORT shared state - GROUPING SETS + window (BE
native) ===")
+ try {
+ sql """
+ SELECT
/*+SET_VAR(use_serial_exchange=true,parallel_pipeline_task_num=5,
+ enable_local_shuffle_planner=false,
+ ignore_storage_data_distribution=true,
+ enable_share_hash_table_for_broadcast_join=false,
+ disable_streaming_preaggregations=true)*/
+ SUM(PERCENT_RANK() OVER (PARTITION BY col_int_undef_signed2
ORDER BY col_int_undef_signed2))
+ FROM rqg_t1
+ GROUP BY GROUPING SETS ((col_int_undef_signed2),(pk,
pk),(col_int_undef_signed))
+ """
+ logger.info("Bug 2b: PASSED (no crash)")
+ } catch (Throwable t) {
+ logger.error("Bug 2b FAILED: ${t.message}")
+ assertTrue(false, "Bug 2b: BE native also fails: ${t.message}")
+ }
+
+ // ============================================================
+ // Bug 3: incorrect results with GROUPING SETS + scalar subquery + window
+ // RQG case: grouping_set.case_id_5694495756
+ // Key conditions: parallel_pipeline_task_num=2,
disable_streaming_preaggregations=true
+ // Expected: all rows same value; Actual: values split proportionally
(1/3, 2/3)
+ // ============================================================
+
+ logger.info("=== Bug 3: incorrect results - GROUPING SETS + subquery +
window ===")
+ // FE planner
+ def result_fe = sql """
+ SELECT /*+SET_VAR(parallel_pipeline_task_num=2,
+ enable_local_shuffle_planner=true,
+ disable_streaming_preaggregations=true,
+ enable_share_hash_table_for_broadcast_join=true)*/
+ SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1))
+ OVER (PARTITION BY pk ORDER BY pk)
+ FROM rqg_t1
+ GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk))
+ """
+ // BE native
+ def result_be = sql """
+ SELECT /*+SET_VAR(parallel_pipeline_task_num=2,
+ enable_local_shuffle_planner=false,
+ disable_streaming_preaggregations=true,
+ enable_share_hash_table_for_broadcast_join=true)*/
+ SUM((SELECT MAX(col_int_undef_signed2) FROM rqg_t1))
+ OVER (PARTITION BY pk ORDER BY pk)
+ FROM rqg_t1
+ GROUP BY GROUPING SETS ((col_int_undef_signed2, pk),(pk), (pk))
+ """
+ logger.info("Bug 3 FE result rows: ${result_fe.size()}, first few:
${result_fe.take(5)}")
+ logger.info("Bug 3 BE result rows: ${result_be.size()}, first few:
${result_be.take(5)}")
+
+ // All values in both should be the same
+ if (result_fe.size() != result_be.size()) {
+ logger.warn("Bug 3: row count mismatch FE=${result_fe.size()} vs
BE=${result_be.size()}")
Review Comment:
Done. Bug 3 now asserts FE==BE (row count + order-insensitive content); Bug
4/5 raise `assertTrue(false)` on an unexpected exception instead of only
logging (Bug 1/2/6/7/8 already asserted in their `catch`). The Bug 21 `catch`
that swallowed `query timeout` (flagged separately by the bot) was also
removed, so a hang fails the suite too.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -285,4 +302,71 @@ public List<Expr> getOtherJoinConjuncts() {
public List<Expr> getMarkJoinConjuncts() {
return markJoinConjuncts;
}
+
+ @Override
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+
+ LocalExchangeTypeRequire probeSideRequire;
+ LocalExchangeTypeRequire buildSideRequire;
+ LocalExchangeType outputType = null;
+
+ if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+ buildSideRequire = probeSideRequire =
LocalExchangeTypeRequire.noRequire();
+ outputType = LocalExchangeType.NOOP;
+ } else if (distrMode == DistributionMode.BROADCAST) {
+ // BE: _child->is_serial_operator() ? PASSTHROUGH/PASS_TO_ONE :
NOOP
+ boolean probeChildSerial = children.get(0).isSerialOperatorOnBe(
+ translatorContext.getConnectContext());
+ boolean buildChildSerial = children.get(1).isSerialOperatorOnBe(
+ translatorContext.getConnectContext());
+ probeSideRequire = probeChildSerial
Review Comment:
Mirrored — `probeSideRequire` now folds in the session flag:
`probePassthrough = forcePassthrough || probeChildSerial`, where
`forcePassthrough` reads `enableBroadcastJoinForcePassthrough` (`HashJoinNode`
~329-337, null-safe for the unit-test mocks).
One caveat from investigating it on a 4-BE cluster (forced non-serial probe
via `ignore_storage_data_distribution=false`): it is effectively a no-op there.
`enforceRequire` only inserts a PASSTHROUGH local exchange to fan a *serial*
(1-task) source out to N tasks; an already-N-task source already satisfies
passthrough, so no exchange is added (identical plan/results vs BE-native, no
crash). The branch is kept to match BE's intent; a true rebalance of a
non-serial probe (actually forcing the exchange) is a perf-only follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]