This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d7258f86342 Fix MSE explainAskingServers segment grouping and remove
all-or-nothing failure (#18696)
d7258f86342 is described below
commit d7258f86342b2931eddf25c33b632788bcd75546
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jun 9 14:43:57 2026 +0200
Fix MSE explainAskingServers segment grouping and remove all-or-nothing
failure (#18696)
---
.../explain/AskingServerStageExplainer.java | 6 +-
.../planner/explain/ExplainNodeSimplifier.java | 104 ++++++--
.../query/planner/explain/PlanNodeMerger.java | 2 +-
.../planner/explain/ExplainNodeSimplifierTest.java | 267 +++++++++++++++++++++
.../query/planner/explain/PlanNodeMergerTest.java | 75 ++++++
5 files changed, 434 insertions(+), 20 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
index a84f6f1f184..c2ff0d80b80 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
@@ -96,7 +96,7 @@ public class AskingServerStageExplainer {
attributes.putLong("servers", entry.getValue());
inputs.add(new ExplainedNode(stageId,
entry.getKey().getDataSchema(), null,
- Collections.singletonList(entry.getKey()), "Alternative",
attributes.build()));
+ Collections.singletonList(entry.getKey()),
ExplainNodeSimplifier.ALTERNATIVE, attributes.build()));
}
mergedNode = new ExplainedNode(stageId, schema, null, inputs,
"IntermediateCombine",
@@ -105,6 +105,10 @@ public class AskingServerStageExplainer {
}
}
+ // Now that the plans of all servers have been merged, drop the
per-segment "Alternative" wrappers that turned out
+ // to hold a single group, so combine nodes where all segments share a
plan render exactly as before.
+ mergedNode = ExplainNodeSimplifier.removeRedundantAlternatives(mergedNode);
+
return PlanNodeToRelConverter.convert(_relBuilder, mergedNode);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
index 0baf2684251..5b67f7f8cff 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
@@ -22,6 +22,9 @@ import com.google.common.base.CaseFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.EnrichedJoinNode;
@@ -40,8 +43,6 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.UnnestNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -53,13 +54,23 @@ import org.slf4j.LoggerFactory;
* <li>Its title must contain the text {@code Combine}</li>
* </ol>
*
- * The simplification process merges the inputs of the node into a single node.
- * As a corollary, nodes with only one input are already simplified by
definition.
+ * The simplification process groups the inputs of the node by merging the
ones that describe an equivalent plan and
+ * wraps each resulting group in an {@code Alternative} node annotated with
the number of segments that fall into it.
+ * Inputs that can all be merged collapse into a single {@code Alternative};
inputs that cannot be merged (for example
+ * because some segments use an index while others do not) produce one {@code
Alternative} per distinct plan.
+ *
+ * Every group is wrapped, even when there is a single one, so that the {@code
segments} counts compose correctly when
+ * the plans of several servers are later merged (a server whose segments are
all uniform still contributes an
+ * {@code Alternative} that folds into the matching group of a server with
divergent segments). The redundant single
+ * {@code Alternative} wrappers are removed once all servers have been merged
via {@link #removeRedundantAlternatives}.
*/
public class ExplainNodeSimplifier {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ExplainNodeSimplifier.class);
public static final String COMBINE
= CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL,
ExplainPlanDataTableReducer.COMBINE);
+ /// Title of the synthetic node that wraps each distinct per-segment plan
together with its segment count.
+ public static final String ALTERNATIVE = "Alternative";
+ /// Name of the additive attribute holding how many segments fall into an
[#ALTERNATIVE] group.
+ public static final String SEGMENTS_ATTRIBUTE = "segments";
private ExplainNodeSimplifier() {
}
@@ -69,6 +80,36 @@ public class ExplainNodeSimplifier {
return root.visit(planNodeMerger, null);
}
+ /// Removes the [#ALTERNATIVE] wrappers that are no longer needed once all
servers have been merged: a combine node
+ /// whose only child is an [#ALTERNATIVE] is unwrapped so the combine points
directly at the single plan. This keeps
+ /// the common case (all segments share a plan) rendered exactly as it was
before per-segment grouping was added,
+ /// while combine nodes with several alternatives keep their annotated
groups.
+ public static PlanNode removeRedundantAlternatives(PlanNode node) {
+ List<PlanNode> inputs = node.getInputs();
+ List<PlanNode> newInputs = null;
+ for (int i = 0; i < inputs.size(); i++) {
+ PlanNode child = inputs.get(i);
+ PlanNode newChild = removeRedundantAlternatives(child);
+ if (newChild != child) {
+ if (newInputs == null) {
+ newInputs = new ArrayList<>(inputs);
+ }
+ newInputs.set(i, newChild);
+ }
+ }
+ PlanNode result = newInputs != null ? node.withInputs(newInputs) : node;
+ if (result instanceof ExplainedNode) {
+ ExplainedNode explained = (ExplainedNode) result;
+ if (explained.getTitle().contains(COMBINE) &&
explained.getInputs().size() == 1) {
+ PlanNode onlyChild = explained.getInputs().get(0);
+ if (onlyChild instanceof ExplainedNode && ((ExplainedNode)
onlyChild).getTitle().equals(ALTERNATIVE)) {
+ return explained.withInputs(onlyChild.getInputs());
+ }
+ }
+ }
+ return result;
+ }
+
private static class Visitor implements PlanNodeVisitor<PlanNode, Void> {
private PlanNode defaultNode(PlanNode node) {
List<PlanNode> inputs = node.getInputs();
@@ -143,24 +184,51 @@ public class ExplainNodeSimplifier {
@Override
public PlanNode visitExplained(ExplainedNode node, Void context) {
- if (!node.getTitle().contains(COMBINE) || node.getInputs().size() <= 1) {
+ if (!node.getTitle().contains(COMBINE) || node.getInputs().isEmpty()) {
return defaultNode(node);
}
List<PlanNode> simplifiedChildren = simplifyChildren(node.getInputs());
- PlanNode child1 = simplifiedChildren.get(0);
-
- for (int i = 1; i < simplifiedChildren.size(); i++) {
- PlanNode child2 = simplifiedChildren.get(i);
- PlanNode merged = PlanNodeMerger.mergePlans(child1, child2, false);
- if (merged == null) {
- LOGGER.info("Found unmergeable inputs on node of type {}: {} and
{}", node, child1, child2);
- assert false : "Unmergeable inputs found";
- return defaultNode(node);
+
+ // Group the children of a combine node by merging the ones that
describe an equivalent plan. Each resulting
+ // group represents a distinct execution plan, shared by one or more
segments. Merging is greedy and partial:
+ // a child that cannot be merged into any existing group (for example
because some segments use an index while
+ // others do a full scan) becomes its own group instead of forcing the
whole node to stay un-simplified. This
+ // keeps the explain plan compact (identical segments collapse into a
single group) while still surfacing the
+ // genuinely different per-segment plans, and it never fails when the
inputs are not all mergeable.
+ List<PlanNode> groups = new ArrayList<>();
+ List<Integer> segmentCounts = new ArrayList<>();
+ for (PlanNode child : simplifiedChildren) {
+ boolean merged = false;
+ for (int i = 0; i < groups.size(); i++) {
+ PlanNode mergedGroup = PlanNodeMerger.mergePlans(groups.get(i),
child, false);
+ if (mergedGroup != null) {
+ groups.set(i, mergedGroup);
+ segmentCounts.set(i, segmentCounts.get(i) + 1);
+ merged = true;
+ break;
+ }
}
- child1 = merged;
+ if (!merged) {
+ groups.add(child);
+ segmentCounts.add(1);
+ }
+ }
+
+ // Wrap each distinct plan in an "Alternative" node annotated with the
number of segments that fall into it, so
+ // the reader can tell the plans apart and how many segments run each
one. The "segments" attribute uses the
+ // default (additive) merge type, so when the same divergence appears on
several servers the per-server counts
+ // are summed as the plans are merged across servers. Even a single
group is wrapped, so that a server whose
+ // segments are all uniform still folds into the matching group of a
server with divergent segments; the
+ // redundant single wrappers are stripped afterwards by
removeRedundantAlternatives.
+ List<PlanNode> alternatives = new ArrayList<>(groups.size());
+ for (int i = 0; i < groups.size(); i++) {
+ PlanNode group = groups.get(i);
+ Map<String, Plan.ExplainNode.AttributeValue> attributes =
+ new ExplainAttributeBuilder().putLong(SEGMENTS_ATTRIBUTE,
segmentCounts.get(i)).build();
+ alternatives.add(new ExplainedNode(node.getStageId(),
group.getDataSchema(), null,
+ Collections.singletonList(group), ALTERNATIVE, attributes));
}
- return new ExplainedNode(node.getStageId(), node.getDataSchema(),
node.getNodeHint(),
- Collections.singletonList(child1), node.getTitle(),
node.getAttributes());
+ return node.withInputs(alternatives);
}
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
index c60027778a4..b9af3463ea0 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
@@ -493,7 +493,7 @@ class PlanNodeMerger {
if (exchangeNode.isSortOnReceiver() != otherNode.isSortOnReceiver()) {
return null;
}
- if (Objects.equals(exchangeNode.getTableNames(),
otherNode.getTableNames())) {
+ if (!Objects.equals(exchangeNode.getTableNames(),
otherNode.getTableNames())) {
return null;
}
List<PlanNode> children = mergeChildren(exchangeNode, context);
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
new file mode 100644
index 00000000000..6f7d8b63c25
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+
+
+/**
+ * Tests for {@link ExplainNodeSimplifier}, the broker-side logic that groups
the per-segment children of a combine
+ * explain node.
+ *
+ * The simplifier must group mergeable segment plans while keeping genuinely
different plans as separate groups, and it
+ * must never fail when the inputs are not all mergeable (these tests run with
assertions enabled, which previously
+ * surfaced as an {@code AssertionError} in the all-or-nothing implementation).
+ */
+public class ExplainNodeSimplifierTest {
+ private static final DataSchema SCHEMA = new DataSchema(new String[]{"col"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+ /// A combine node whose title must contain {@code Combine} for the
simplifier to act on it.
+ private static final String COMBINE_TITLE = "LeafStageCombineOperator";
+
+ private static ExplainedNode leaf(String title) {
+ return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title,
Map.of());
+ }
+
+ private static ExplainedNode leaf(String title, Map<String,
Plan.ExplainNode.AttributeValue> attributes) {
+ return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title,
attributes);
+ }
+
+ private static ExplainedNode combine(PlanNode... children) {
+ return new ExplainedNode(0, SCHEMA, null, Arrays.asList(children),
COMBINE_TITLE, Map.of());
+ }
+
+ /// Wraps a child the way {@code AcquireReleaseColumnsSegmentOperator} does
when prefetch is enabled.
+ private static ExplainedNode acquireRelease(PlanNode child) {
+ return new ExplainedNode(0, SCHEMA, null,
Collections.singletonList(child), "AcquireReleaseColumnsSegment",
+ Map.of());
+ }
+
+ private static Map<String, Plan.ExplainNode.AttributeValue> docsAttr(long
totalDocs) {
+ // Mimics a DEFAULT (summed) numeric attribute such as totalDocs.
+ return new ExplainAttributeBuilder().putLong("totalDocs",
totalDocs).build();
+ }
+
+ /// Finds the "Alternative" group under {@code combine} whose plan matches
the given chain of titles (from the
+ /// Alternative's direct child downwards) and returns its {@code segments}
count.
+ private static long segmentCountFor(ExplainedNode combine, String...
innerTitlePath) {
+ for (PlanNode input : combine.getInputs()) {
+ ExplainedNode alternative = (ExplainedNode) input;
+ assertEquals(alternative.getTitle(), "Alternative", "Each group of a
multi-plan combine must be an Alternative");
+ if (matchesPath(alternative.getInputs().get(0), innerTitlePath)) {
+ return alternative.getAttributes().get("segments").getLong();
+ }
+ }
+ throw new AssertionError("No Alternative group matching plan " +
Arrays.toString(innerTitlePath));
+ }
+
+ private static boolean matchesPath(PlanNode node, String... titlePath) {
+ PlanNode current = node;
+ for (int i = 0; i < titlePath.length; i++) {
+ if (!(current instanceof ExplainedNode) || !((ExplainedNode)
current).getTitle().equals(titlePath[i])) {
+ return false;
+ }
+ if (i < titlePath.length - 1) {
+ if (current.getInputs().isEmpty()) {
+ return false;
+ }
+ current = current.getInputs().get(0);
+ }
+ }
+ return true;
+ }
+
+ @Test
+ public void allIdenticalSegmentsCollapseIntoOneGroup() {
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("Scan")));
+
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(combine.getTitle(), COMBINE_TITLE);
+ assertEquals(combine.getInputs().size(), 1, "Identical segments must
collapse into a single group");
+ // Even a single group is wrapped in an Alternative carrying its segment
count; the wrapper is stripped later by
+ // removeRedundantAlternatives.
+ assertEquals(segmentCountFor(combine, "Scan"), 3L);
+ }
+
+ @Test
+ public void identicalSegmentsWithSummedAttributesMerge() {
+ // totalDocs is a DEFAULT numeric attribute and must be summed when
merging, not block the merge.
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan", docsAttr(10)), leaf("Scan", docsAttr(20)),
leaf("Scan", docsAttr(30))));
+
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(combine.getInputs().size(), 1);
+ ExplainedNode alternative = (ExplainedNode) combine.getInputs().get(0);
+ assertEquals(alternative.getTitle(), "Alternative");
+ ExplainedNode merged = (ExplainedNode) alternative.getInputs().get(0);
+ assertEquals(merged.getAttributes().get("totalDocs").getLong(), 60L);
+ }
+
+ @Test
+ public void heterogeneousSegmentsAreGroupedWithCounts() {
+ // 3 segments use a "Scan" plan and 2 use a "SortedIndexScan" plan. The
simplifier must produce 2 groups, not 5
+ // separate children (DATA-116) and not fail, and each group must report
how many segments fall into it.
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("SortedIndexScan"), leaf("Scan"),
leaf("SortedIndexScan"), leaf("Scan")));
+
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(combine.getInputs().size(), 2, "Distinct segment plans must
be grouped, not listed one by one");
+ assertEquals(segmentCountFor(combine, "Scan"), 3L);
+ assertEquals(segmentCountFor(combine, "SortedIndexScan"), 2L);
+ }
+
+ @Test
+ public void unmergeableSegmentsDoNotThrowAndAreKeptWithCounts() {
+ // Every segment has a distinct plan: nothing merges. The old
all-or-nothing implementation hit `assert false`
+ // here (an AssertionError under -ea, the deterministic explain failure).
It must now keep every plan as its own
+ // group, each with a segment count of 1.
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("A"), leaf("B"), leaf("C")));
+
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(combine.getInputs().size(), 3);
+ assertEquals(segmentCountFor(combine, "A"), 1L);
+ assertEquals(segmentCountFor(combine, "B"), 1L);
+ assertEquals(segmentCountFor(combine, "C"), 1L);
+ }
+
+ @Test
+ public void acquireReleaseWrappedSegmentsAreGroupedByInnerPlan() {
+ // Reproduces the prefetch-enabled shape from DATA-116: each segment is
wrapped in an AcquireReleaseColumnsSegment
+ // node. Grouping must happen on the inner plan, collapsing identical
segments and reporting their counts.
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(
+ acquireRelease(leaf("Scan")),
+ acquireRelease(leaf("Scan")),
+ acquireRelease(leaf("Scan")),
+ acquireRelease(leaf("SortedIndexScan"))));
+
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(combine.getInputs().size(), 2,
+ "AcquireReleaseColumnsSegment wrappers must be grouped by their inner
plan");
+ assertEquals(segmentCountFor(combine, "AcquireReleaseColumnsSegment",
"Scan"), 3L);
+ assertEquals(segmentCountFor(combine, "AcquireReleaseColumnsSegment",
"SortedIndexScan"), 1L);
+ }
+
+ @Test
+ public void segmentCountsSumWhenSimplifiedPlansAreMergedAcrossServers() {
+ // Two servers each see the same divergence (some Scan, some
SortedIndexScan). After simplifying each server's
+ // combine node, merging the two (as the across-server merge does) must
keep 2 groups and sum the segment counts.
+ ExplainedNode serverA = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("Scan"),
leaf("SortedIndexScan"), leaf("SortedIndexScan")));
+ ExplainedNode serverB = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("SortedIndexScan"),
leaf("SortedIndexScan"), leaf("SortedIndexScan"),
+ leaf("SortedIndexScan")));
+
+ ExplainedNode merged = (ExplainedNode) PlanNodeMerger.mergePlans(serverA,
serverB, false);
+
+ assertEquals(merged.getInputs().size(), 2);
+ assertEquals(segmentCountFor(merged, "Scan"), 4L, "3 + 1 Scan segments
across the two servers");
+ assertEquals(segmentCountFor(merged, "SortedIndexScan"), 6L, "2 + 4
SortedIndexScan segments across servers");
+ }
+
+ @Test
+ public void removeRedundantAlternativesUnwrapsSingleGroup() {
+ // The common case: all segments share a plan, so simplification leaves a
single Alternative. After all servers
+ // are merged, the wrapper is redundant and is stripped so the output
matches the pre-grouping format exactly.
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("Scan")));
+ PlanNode cleaned =
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+
+ ExplainedNode combine = (ExplainedNode) cleaned;
+ assertEquals(combine.getInputs().size(), 1);
+ assertEquals(((ExplainedNode) combine.getInputs().get(0)).getTitle(),
"Scan",
+ "A combine with a single group must point directly at the plan, with
no Alternative wrapper");
+ }
+
+ @Test
+ public void removeRedundantAlternativesKeepsMultipleGroups() {
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("SortedIndexScan")));
+ PlanNode cleaned =
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+
+ ExplainedNode combine = (ExplainedNode) cleaned;
+ assertEquals(combine.getInputs().size(), 2, "Several distinct plans keep
their annotated Alternative groups");
+ assertEquals(segmentCountFor(combine, "Scan"), 2L);
+ assertEquals(segmentCountFor(combine, "SortedIndexScan"), 1L);
+ }
+
+ @Test
+ public void uniformServerFoldsIntoDivergentServerAcrossServers() {
+ // Server A has divergent segment plans, server B's segments are all
uniform. Because both servers emit Alternative
+ // wrappers, B's segments fold into A's matching group when the plans are
merged across servers, and the redundant
+ // wrappers are then removed. This is the case that was previously left as
a bare sibling next to the Alternatives.
+ ExplainedNode serverA = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("SortedIndexScan")));
+ ExplainedNode serverB = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+ combine(leaf("Scan"), leaf("Scan"), leaf("Scan"), leaf("Scan")));
+
+ ExplainedNode merged = (ExplainedNode) PlanNodeMerger.mergePlans(serverA,
serverB, false);
+ ExplainedNode cleaned = (ExplainedNode)
ExplainNodeSimplifier.removeRedundantAlternatives(merged);
+
+ assertEquals(cleaned.getInputs().size(), 2);
+ assertEquals(segmentCountFor(cleaned, "Scan"), 6L, "2 (server A) + 4
(server B) Scan segments fold together");
+ assertEquals(segmentCountFor(cleaned, "SortedIndexScan"), 1L);
+ }
+
+ @Test
+ public void singleSegmentFoldsAndUnwrapsForBackwardCompatibility() {
+ // A combine with a single segment is also wrapped (segments=1) so it
composes across servers, then unwrapped.
+ PlanNode simplified =
ExplainNodeSimplifier.simplifyNode(combine(leaf("Scan")));
+ ExplainedNode combine = (ExplainedNode) simplified;
+ assertEquals(segmentCountFor(combine, "Scan"), 1L);
+
+ ExplainedNode cleaned = (ExplainedNode)
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+ assertEquals(((ExplainedNode) cleaned.getInputs().get(0)).getTitle(),
"Scan");
+ }
+
+ @Test
+ public void nonCombineNodeRecursesButDoesNotGroupChildren() {
+ // A node whose title does not contain "Combine" must keep all its
children, only recursing into them.
+ ExplainedNode nonCombine = new ExplainedNode(0, SCHEMA, null,
+ Arrays.asList(leaf("Scan"), leaf("Scan")), "InstanceResponse",
Map.of());
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(nonCombine);
+ assertEquals(((ExplainedNode) simplified).getInputs().size(), 2,
+ "Non-combine nodes must not group their children");
+ }
+
+ @Test
+ public void nestedCombineUnderNonCombineIsSimplified() {
+ ExplainedNode nested = new ExplainedNode(0, SCHEMA, null,
+ Collections.singletonList(combine(leaf("Scan"), leaf("Scan"))),
"InstanceResponse", Map.of());
+ PlanNode simplified = ExplainNodeSimplifier.simplifyNode(nested);
+
+ assertNotSame(simplified, nested);
+ ExplainedNode innerCombine = (ExplainedNode) ((ExplainedNode)
simplified).getInputs().get(0);
+ assertEquals(innerCombine.getInputs().size(), 1, "Nested combine node must
be simplified too");
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
new file mode 100644
index 00000000000..f973a552120
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNode.NodeHint;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Tests for {@link PlanNodeMerger}, the logic that decides whether two
explain plan nodes describe the same plan and
+ * can be merged into one.
+ */
+public class PlanNodeMergerTest {
+ private static final DataSchema SCHEMA = new DataSchema(new String[]{"col"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+ private static ExplainedNode leaf(String title) {
+ return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title,
Map.of());
+ }
+
+ private static ExchangeNode exchange(Set<String> tableNames) {
+ return new ExchangeNode(0, SCHEMA, Collections.singletonList(leaf("Scan")),
+ PinotRelExchangeType.getDefaultExchangeType(),
RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false, null,
+ false, false, tableNames, ExchangeStrategy.BROADCAST_EXCHANGE,
"absHashCodeMurmur3");
+ }
+
+ @Test
+ public void exchangesWithSameTableNamesMerge() {
+ PlanNode merged = PlanNodeMerger.mergePlans(exchange(Set.of("t1")),
exchange(Set.of("t1")), false);
+ assertNotNull(merged, "Exchange nodes describing the same tables must be
mergeable");
+ }
+
+ @Test
+ public void exchangesWithDifferentTableNamesDoNotMerge() {
+ PlanNode merged = PlanNodeMerger.mergePlans(exchange(Set.of("t1")),
exchange(Set.of("t2")), false);
+ assertNull(merged, "Exchange nodes over different tables must not be
merged");
+ }
+
+ @Test
+ public void differentNodeTypesDoNotMerge() {
+ ProjectNode project = new ProjectNode(0, SCHEMA, NodeHint.EMPTY,
Collections.singletonList(leaf("Scan")),
+ Collections.emptyList());
+ assertNull(PlanNodeMerger.mergePlans(project, leaf("Scan"), false));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]