This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7258f86342 Fix MSE explainAskingServers segment grouping and remove 
all-or-nothing failure (#18696)
d7258f86342 is described below

commit d7258f86342b2931eddf25c33b632788bcd75546
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jun 9 14:43:57 2026 +0200

    Fix MSE explainAskingServers segment grouping and remove all-or-nothing 
failure (#18696)
---
 .../explain/AskingServerStageExplainer.java        |   6 +-
 .../planner/explain/ExplainNodeSimplifier.java     | 104 ++++++--
 .../query/planner/explain/PlanNodeMerger.java      |   2 +-
 .../planner/explain/ExplainNodeSimplifierTest.java | 267 +++++++++++++++++++++
 .../query/planner/explain/PlanNodeMergerTest.java  |  75 ++++++
 5 files changed, 434 insertions(+), 20 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
index a84f6f1f184..c2ff0d80b80 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/AskingServerStageExplainer.java
@@ -96,7 +96,7 @@ public class AskingServerStageExplainer {
           attributes.putLong("servers", entry.getValue());
 
           inputs.add(new ExplainedNode(stageId, 
entry.getKey().getDataSchema(), null,
-              Collections.singletonList(entry.getKey()), "Alternative", 
attributes.build()));
+              Collections.singletonList(entry.getKey()), 
ExplainNodeSimplifier.ALTERNATIVE, attributes.build()));
         }
 
         mergedNode = new ExplainedNode(stageId, schema, null, inputs, 
"IntermediateCombine",
@@ -105,6 +105,10 @@ public class AskingServerStageExplainer {
       }
     }
 
+    // Now that the plans of all servers have been merged, drop the 
per-segment "Alternative" wrappers that turned out
+    // to hold a single group, so combine nodes where all segments share a 
plan render exactly as before.
+    mergedNode = ExplainNodeSimplifier.removeRedundantAlternatives(mergedNode);
+
     return PlanNodeToRelConverter.convert(_relBuilder, mergedNode);
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
index 0baf2684251..5b67f7f8cff 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifier.java
@@ -22,6 +22,9 @@ import com.google.common.base.CaseFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
 import org.apache.pinot.core.query.reduce.ExplainPlanDataTableReducer;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.EnrichedJoinNode;
@@ -40,8 +43,6 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.UnnestNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -53,13 +54,23 @@ import org.slf4j.LoggerFactory;
  *   <li>Its title must contain the text {@code Combine}</li>
  * </ol>
  *
- * The simplification process merges the inputs of the node into a single node.
- * As a corollary, nodes with only one input are already simplified by 
definition.
+ * The simplification process groups the inputs of the node by merging the 
ones that describe an equivalent plan and
+ * wraps each resulting group in an {@code Alternative} node annotated with 
the number of segments that fall into it.
+ * Inputs that can all be merged collapse into a single {@code Alternative}; 
inputs that cannot be merged (for example
+ * because some segments use an index while others do not) produce one {@code 
Alternative} per distinct plan.
+ *
+ * Every group is wrapped, even when there is a single one, so that the {@code 
segments} counts compose correctly when
+ * the plans of several servers are later merged (a server whose segments are 
all uniform still contributes an
+ * {@code Alternative} that folds into the matching group of a server with 
divergent segments). The redundant single
+ * {@code Alternative} wrappers are removed once all servers have been merged 
via {@link #removeRedundantAlternatives}.
  */
 public class ExplainNodeSimplifier {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExplainNodeSimplifier.class);
   public static final String COMBINE
       = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
ExplainPlanDataTableReducer.COMBINE);
+  /// Title of the synthetic node that wraps each distinct per-segment plan 
together with its segment count.
+  public static final String ALTERNATIVE = "Alternative";
+  /// Name of the additive attribute holding how many segments fall into an 
[#ALTERNATIVE] group.
+  public static final String SEGMENTS_ATTRIBUTE = "segments";
 
   private ExplainNodeSimplifier() {
   }
@@ -69,6 +80,36 @@ public class ExplainNodeSimplifier {
     return root.visit(planNodeMerger, null);
   }
 
+  /// Removes the [#ALTERNATIVE] wrappers that are no longer needed once all 
servers have been merged: a combine node
+  /// whose only child is an [#ALTERNATIVE] is unwrapped so the combine points 
directly at the single plan. This keeps
+  /// the common case (all segments share a plan) rendered exactly as it was 
before per-segment grouping was added,
+  /// while combine nodes with several alternatives keep their annotated 
groups.
+  public static PlanNode removeRedundantAlternatives(PlanNode node) {
+    List<PlanNode> inputs = node.getInputs();
+    List<PlanNode> newInputs = null;
+    for (int i = 0; i < inputs.size(); i++) {
+      PlanNode child = inputs.get(i);
+      PlanNode newChild = removeRedundantAlternatives(child);
+      if (newChild != child) {
+        if (newInputs == null) {
+          newInputs = new ArrayList<>(inputs);
+        }
+        newInputs.set(i, newChild);
+      }
+    }
+    PlanNode result = newInputs != null ? node.withInputs(newInputs) : node;
+    if (result instanceof ExplainedNode) {
+      ExplainedNode explained = (ExplainedNode) result;
+      if (explained.getTitle().contains(COMBINE) && 
explained.getInputs().size() == 1) {
+        PlanNode onlyChild = explained.getInputs().get(0);
+        if (onlyChild instanceof ExplainedNode && ((ExplainedNode) 
onlyChild).getTitle().equals(ALTERNATIVE)) {
+          return explained.withInputs(onlyChild.getInputs());
+        }
+      }
+    }
+    return result;
+  }
+
   private static class Visitor implements PlanNodeVisitor<PlanNode, Void> {
     private PlanNode defaultNode(PlanNode node) {
       List<PlanNode> inputs = node.getInputs();
@@ -143,24 +184,51 @@ public class ExplainNodeSimplifier {
 
     @Override
     public PlanNode visitExplained(ExplainedNode node, Void context) {
-      if (!node.getTitle().contains(COMBINE) || node.getInputs().size() <= 1) {
+      if (!node.getTitle().contains(COMBINE) || node.getInputs().isEmpty()) {
         return defaultNode(node);
       }
       List<PlanNode> simplifiedChildren = simplifyChildren(node.getInputs());
-      PlanNode child1 = simplifiedChildren.get(0);
-
-      for (int i = 1; i < simplifiedChildren.size(); i++) {
-        PlanNode child2 = simplifiedChildren.get(i);
-        PlanNode merged = PlanNodeMerger.mergePlans(child1, child2, false);
-        if (merged == null) {
-          LOGGER.info("Found unmergeable inputs on node of type {}: {} and 
{}", node, child1, child2);
-          assert false : "Unmergeable inputs found";
-          return defaultNode(node);
+
+      // Group the children of a combine node by merging the ones that 
describe an equivalent plan. Each resulting
+      // group represents a distinct execution plan, shared by one or more 
segments. Merging is greedy and partial:
+      // a child that cannot be merged into any existing group (for example 
because some segments use an index while
+      // others do a full scan) becomes its own group instead of forcing the 
whole node to stay un-simplified. This
+      // keeps the explain plan compact (identical segments collapse into a 
single group) while still surfacing the
+      // genuinely different per-segment plans, and it never fails when the 
inputs are not all mergeable.
+      List<PlanNode> groups = new ArrayList<>();
+      List<Integer> segmentCounts = new ArrayList<>();
+      for (PlanNode child : simplifiedChildren) {
+        boolean merged = false;
+        for (int i = 0; i < groups.size(); i++) {
+          PlanNode mergedGroup = PlanNodeMerger.mergePlans(groups.get(i), 
child, false);
+          if (mergedGroup != null) {
+            groups.set(i, mergedGroup);
+            segmentCounts.set(i, segmentCounts.get(i) + 1);
+            merged = true;
+            break;
+          }
         }
-        child1 = merged;
+        if (!merged) {
+          groups.add(child);
+          segmentCounts.add(1);
+        }
+      }
+
+      // Wrap each distinct plan in an "Alternative" node annotated with the 
number of segments that fall into it, so
+      // the reader can tell the plans apart and how many segments run each 
one. The "segments" attribute uses the
+      // default (additive) merge type, so when the same divergence appears on 
several servers the per-server counts
+      // are summed as the plans are merged across servers. Even a single 
group is wrapped, so that a server whose
+      // segments are all uniform still folds into the matching group of a 
server with divergent segments; the
+      // redundant single wrappers are stripped afterwards by 
removeRedundantAlternatives.
+      List<PlanNode> alternatives = new ArrayList<>(groups.size());
+      for (int i = 0; i < groups.size(); i++) {
+        PlanNode group = groups.get(i);
+        Map<String, Plan.ExplainNode.AttributeValue> attributes =
+            new ExplainAttributeBuilder().putLong(SEGMENTS_ATTRIBUTE, 
segmentCounts.get(i)).build();
+        alternatives.add(new ExplainedNode(node.getStageId(), 
group.getDataSchema(), null,
+            Collections.singletonList(group), ALTERNATIVE, attributes));
       }
-      return new ExplainedNode(node.getStageId(), node.getDataSchema(), 
node.getNodeHint(),
-          Collections.singletonList(child1), node.getTitle(), 
node.getAttributes());
+      return node.withInputs(alternatives);
     }
 
     @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
index c60027778a4..b9af3463ea0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PlanNodeMerger.java
@@ -493,7 +493,7 @@ class PlanNodeMerger {
       if (exchangeNode.isSortOnReceiver() != otherNode.isSortOnReceiver()) {
         return null;
       }
-      if (Objects.equals(exchangeNode.getTableNames(), 
otherNode.getTableNames())) {
+      if (!Objects.equals(exchangeNode.getTableNames(), 
otherNode.getTableNames())) {
         return null;
       }
       List<PlanNode> children = mergeChildren(exchangeNode, context);
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
new file mode 100644
index 00000000000..6f7d8b63c25
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/ExplainNodeSimplifierTest.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+
+
+/**
+ * Tests for {@link ExplainNodeSimplifier}, the broker-side logic that groups 
the per-segment children of a combine
+ * explain node.
+ *
+ * The simplifier must group mergeable segment plans while keeping genuinely 
different plans as separate groups, and it
+ * must never fail when the inputs are not all mergeable (these tests run with 
assertions enabled, which previously
+ * surfaced as an {@code AssertionError} in the all-or-nothing implementation).
+ */
+public class ExplainNodeSimplifierTest {
+  private static final DataSchema SCHEMA = new DataSchema(new String[]{"col"},
+      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+  /// A combine node whose title must contain {@code Combine} for the 
simplifier to act on it.
+  private static final String COMBINE_TITLE = "LeafStageCombineOperator";
+
+  private static ExplainedNode leaf(String title) {
+    return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title, 
Map.of());
+  }
+
+  private static ExplainedNode leaf(String title, Map<String, 
Plan.ExplainNode.AttributeValue> attributes) {
+    return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title, 
attributes);
+  }
+
+  private static ExplainedNode combine(PlanNode... children) {
+    return new ExplainedNode(0, SCHEMA, null, Arrays.asList(children), 
COMBINE_TITLE, Map.of());
+  }
+
+  /// Wraps a child the way {@code AcquireReleaseColumnsSegmentOperator} does 
when prefetch is enabled.
+  private static ExplainedNode acquireRelease(PlanNode child) {
+    return new ExplainedNode(0, SCHEMA, null, 
Collections.singletonList(child), "AcquireReleaseColumnsSegment",
+        Map.of());
+  }
+
+  private static Map<String, Plan.ExplainNode.AttributeValue> docsAttr(long 
totalDocs) {
+    // Mimics a DEFAULT (summed) numeric attribute such as totalDocs.
+    return new ExplainAttributeBuilder().putLong("totalDocs", 
totalDocs).build();
+  }
+
+  /// Finds the "Alternative" group under {@code combine} whose plan matches 
the given chain of titles (from the
+  /// Alternative's direct child downwards) and returns its {@code segments} 
count.
+  private static long segmentCountFor(ExplainedNode combine, String... 
innerTitlePath) {
+    for (PlanNode input : combine.getInputs()) {
+      ExplainedNode alternative = (ExplainedNode) input;
+      assertEquals(alternative.getTitle(), "Alternative", "Each group of a 
multi-plan combine must be an Alternative");
+      if (matchesPath(alternative.getInputs().get(0), innerTitlePath)) {
+        return alternative.getAttributes().get("segments").getLong();
+      }
+    }
+    throw new AssertionError("No Alternative group matching plan " + 
Arrays.toString(innerTitlePath));
+  }
+
+  private static boolean matchesPath(PlanNode node, String... titlePath) {
+    PlanNode current = node;
+    for (int i = 0; i < titlePath.length; i++) {
+      if (!(current instanceof ExplainedNode) || !((ExplainedNode) 
current).getTitle().equals(titlePath[i])) {
+        return false;
+      }
+      if (i < titlePath.length - 1) {
+        if (current.getInputs().isEmpty()) {
+          return false;
+        }
+        current = current.getInputs().get(0);
+      }
+    }
+    return true;
+  }
+
+  @Test
+  public void allIdenticalSegmentsCollapseIntoOneGroup() {
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("Scan")));
+
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(combine.getTitle(), COMBINE_TITLE);
+    assertEquals(combine.getInputs().size(), 1, "Identical segments must 
collapse into a single group");
+    // Even a single group is wrapped in an Alternative carrying its segment 
count; the wrapper is stripped later by
+    // removeRedundantAlternatives.
+    assertEquals(segmentCountFor(combine, "Scan"), 3L);
+  }
+
+  @Test
+  public void identicalSegmentsWithSummedAttributesMerge() {
+    // totalDocs is a DEFAULT numeric attribute and must be summed when 
merging, not block the merge.
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan", docsAttr(10)), leaf("Scan", docsAttr(20)), 
leaf("Scan", docsAttr(30))));
+
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(combine.getInputs().size(), 1);
+    ExplainedNode alternative = (ExplainedNode) combine.getInputs().get(0);
+    assertEquals(alternative.getTitle(), "Alternative");
+    ExplainedNode merged = (ExplainedNode) alternative.getInputs().get(0);
+    assertEquals(merged.getAttributes().get("totalDocs").getLong(), 60L);
+  }
+
+  @Test
+  public void heterogeneousSegmentsAreGroupedWithCounts() {
+    // 3 segments use a "Scan" plan and 2 use a "SortedIndexScan" plan. The 
simplifier must produce 2 groups, not 5
+    // separate children (DATA-116) and not fail, and each group must report 
how many segments fall into it.
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("SortedIndexScan"), leaf("Scan"), 
leaf("SortedIndexScan"), leaf("Scan")));
+
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(combine.getInputs().size(), 2, "Distinct segment plans must 
be grouped, not listed one by one");
+    assertEquals(segmentCountFor(combine, "Scan"), 3L);
+    assertEquals(segmentCountFor(combine, "SortedIndexScan"), 2L);
+  }
+
+  @Test
+  public void unmergeableSegmentsDoNotThrowAndAreKeptWithCounts() {
+    // Every segment has a distinct plan: nothing merges. The old 
all-or-nothing implementation hit `assert false`
+    // here (an AssertionError under -ea, the deterministic explain failure). 
It must now keep every plan as its own
+    // group, each with a segment count of 1.
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("A"), leaf("B"), leaf("C")));
+
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(combine.getInputs().size(), 3);
+    assertEquals(segmentCountFor(combine, "A"), 1L);
+    assertEquals(segmentCountFor(combine, "B"), 1L);
+    assertEquals(segmentCountFor(combine, "C"), 1L);
+  }
+
+  @Test
+  public void acquireReleaseWrappedSegmentsAreGroupedByInnerPlan() {
+    // Reproduces the prefetch-enabled shape from DATA-116: each segment is 
wrapped in an AcquireReleaseColumnsSegment
+    // node. Grouping must happen on the inner plan, collapsing identical 
segments and reporting their counts.
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(
+            acquireRelease(leaf("Scan")),
+            acquireRelease(leaf("Scan")),
+            acquireRelease(leaf("Scan")),
+            acquireRelease(leaf("SortedIndexScan"))));
+
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(combine.getInputs().size(), 2,
+        "AcquireReleaseColumnsSegment wrappers must be grouped by their inner 
plan");
+    assertEquals(segmentCountFor(combine, "AcquireReleaseColumnsSegment", 
"Scan"), 3L);
+    assertEquals(segmentCountFor(combine, "AcquireReleaseColumnsSegment", 
"SortedIndexScan"), 1L);
+  }
+
+  @Test
+  public void segmentCountsSumWhenSimplifiedPlansAreMergedAcrossServers() {
+    // Two servers each see the same divergence (some Scan, some 
SortedIndexScan). After simplifying each server's
+    // combine node, merging the two (as the across-server merge does) must 
keep 2 groups and sum the segment counts.
+    ExplainedNode serverA = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("Scan"), 
leaf("SortedIndexScan"), leaf("SortedIndexScan")));
+    ExplainedNode serverB = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("SortedIndexScan"), 
leaf("SortedIndexScan"), leaf("SortedIndexScan"),
+            leaf("SortedIndexScan")));
+
+    ExplainedNode merged = (ExplainedNode) PlanNodeMerger.mergePlans(serverA, 
serverB, false);
+
+    assertEquals(merged.getInputs().size(), 2);
+    assertEquals(segmentCountFor(merged, "Scan"), 4L, "3 + 1 Scan segments 
across the two servers");
+    assertEquals(segmentCountFor(merged, "SortedIndexScan"), 6L, "2 + 4 
SortedIndexScan segments across servers");
+  }
+
+  @Test
+  public void removeRedundantAlternativesUnwrapsSingleGroup() {
+    // The common case: all segments share a plan, so simplification leaves a 
single Alternative. After all servers
+    // are merged, the wrapper is redundant and is stripped so the output 
matches the pre-grouping format exactly.
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("Scan")));
+    PlanNode cleaned = 
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+
+    ExplainedNode combine = (ExplainedNode) cleaned;
+    assertEquals(combine.getInputs().size(), 1);
+    assertEquals(((ExplainedNode) combine.getInputs().get(0)).getTitle(), 
"Scan",
+        "A combine with a single group must point directly at the plan, with 
no Alternative wrapper");
+  }
+
+  @Test
+  public void removeRedundantAlternativesKeepsMultipleGroups() {
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("SortedIndexScan")));
+    PlanNode cleaned = 
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+
+    ExplainedNode combine = (ExplainedNode) cleaned;
+    assertEquals(combine.getInputs().size(), 2, "Several distinct plans keep 
their annotated Alternative groups");
+    assertEquals(segmentCountFor(combine, "Scan"), 2L);
+    assertEquals(segmentCountFor(combine, "SortedIndexScan"), 1L);
+  }
+
+  @Test
+  public void uniformServerFoldsIntoDivergentServerAcrossServers() {
+    // Server A has divergent segment plans, server B's segments are all 
uniform. Because both servers emit Alternative
+    // wrappers, B's segments fold into A's matching group when the plans are 
merged across servers, and the redundant
+    // wrappers are then removed. This is the case that was previously left as 
a bare sibling next to the Alternatives.
+    ExplainedNode serverA = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("SortedIndexScan")));
+    ExplainedNode serverB = (ExplainedNode) ExplainNodeSimplifier.simplifyNode(
+        combine(leaf("Scan"), leaf("Scan"), leaf("Scan"), leaf("Scan")));
+
+    ExplainedNode merged = (ExplainedNode) PlanNodeMerger.mergePlans(serverA, 
serverB, false);
+    ExplainedNode cleaned = (ExplainedNode) 
ExplainNodeSimplifier.removeRedundantAlternatives(merged);
+
+    assertEquals(cleaned.getInputs().size(), 2);
+    assertEquals(segmentCountFor(cleaned, "Scan"), 6L, "2 (server A) + 4 
(server B) Scan segments fold together");
+    assertEquals(segmentCountFor(cleaned, "SortedIndexScan"), 1L);
+  }
+
+  @Test
+  public void singleSegmentFoldsAndUnwrapsForBackwardCompatibility() {
+    // A combine with a single segment is also wrapped (segments=1) so it 
composes across servers, then unwrapped.
+    PlanNode simplified = 
ExplainNodeSimplifier.simplifyNode(combine(leaf("Scan")));
+    ExplainedNode combine = (ExplainedNode) simplified;
+    assertEquals(segmentCountFor(combine, "Scan"), 1L);
+
+    ExplainedNode cleaned = (ExplainedNode) 
ExplainNodeSimplifier.removeRedundantAlternatives(simplified);
+    assertEquals(((ExplainedNode) cleaned.getInputs().get(0)).getTitle(), 
"Scan");
+  }
+
+  @Test
+  public void nonCombineNodeRecursesButDoesNotGroupChildren() {
+    // A node whose title does not contain "Combine" must keep all its 
children, only recursing into them.
+    ExplainedNode nonCombine = new ExplainedNode(0, SCHEMA, null,
+        Arrays.asList(leaf("Scan"), leaf("Scan")), "InstanceResponse", 
Map.of());
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(nonCombine);
+    assertEquals(((ExplainedNode) simplified).getInputs().size(), 2,
+        "Non-combine nodes must not group their children");
+  }
+
+  @Test
+  public void nestedCombineUnderNonCombineIsSimplified() {
+    ExplainedNode nested = new ExplainedNode(0, SCHEMA, null,
+        Collections.singletonList(combine(leaf("Scan"), leaf("Scan"))), 
"InstanceResponse", Map.of());
+    PlanNode simplified = ExplainNodeSimplifier.simplifyNode(nested);
+
+    assertNotSame(simplified, nested);
+    ExplainedNode innerCombine = (ExplainedNode) ((ExplainedNode) 
simplified).getInputs().get(0);
+    assertEquals(innerCombine.getInputs().size(), 1, "Nested combine node must 
be simplified too");
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
new file mode 100644
index 00000000000..f973a552120
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/explain/PlanNodeMergerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.explain;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.ExplainedNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNode.NodeHint;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Tests for {@link PlanNodeMerger}, the logic that decides whether two 
explain plan nodes describe the same plan and
+ * can be merged into one.
+ */
+public class PlanNodeMergerTest {
+  private static final DataSchema SCHEMA = new DataSchema(new String[]{"col"},
+      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+  private static ExplainedNode leaf(String title) {
+    return new ExplainedNode(0, SCHEMA, null, Collections.emptyList(), title, 
Map.of());
+  }
+
+  private static ExchangeNode exchange(Set<String> tableNames) {
+    return new ExchangeNode(0, SCHEMA, Collections.singletonList(leaf("Scan")),
+        PinotRelExchangeType.getDefaultExchangeType(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED, null, false, null,
+        false, false, tableNames, ExchangeStrategy.BROADCAST_EXCHANGE, 
"absHashCodeMurmur3");
+  }
+
+  @Test
+  public void exchangesWithSameTableNamesMerge() {
+    PlanNode merged = PlanNodeMerger.mergePlans(exchange(Set.of("t1")), 
exchange(Set.of("t1")), false);
+    assertNotNull(merged, "Exchange nodes describing the same tables must be 
mergeable");
+  }
+
+  @Test
+  public void exchangesWithDifferentTableNamesDoNotMerge() {
+    PlanNode merged = PlanNodeMerger.mergePlans(exchange(Set.of("t1")), 
exchange(Set.of("t2")), false);
+    assertNull(merged, "Exchange nodes over different tables must not be 
merged");
+  }
+
+  @Test
+  public void differentNodeTypesDoNotMerge() {
+    ProjectNode project = new ProjectNode(0, SCHEMA, NodeHint.EMPTY, 
Collections.singletonList(leaf("Scan")),
+        Collections.emptyList());
+    assertNull(PlanNodeMerger.mergePlans(project, leaf("Scan"), false));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to