This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 46883181101 [multistage] short-circuit in broker when all segments are 
pruned or table is empty (#18538)
46883181101 is described below

commit 46883181101bdb487ff968491326d575ef3cd663
Author: dang-stripe <[email protected]>
AuthorDate: Mon May 25 16:01:36 2026 -0700

    [multistage] short-circuit in broker when all segments are pruned or table 
is empty (#18538)
    
    * implement broker short-circuit when all segments are pruned
    
    * move empty-leaf rewrite from broker into PinotDispatchPlanner
    
    * address review feedback
    
    * enable useBrokerPruning in short-circuit integration tests
    
    * fix integration test by querying only offline table
    
    * enable TimeSegmentPruner in integration test table config
---
 .../MultiStageBrokerRequestHandler.java            |  81 ++++++++-----
 .../MultiStageBrokerRequestHandlerTest.java        |  85 +++++++++++++-
 .../tests/MultiStageEngineIntegrationTest.java     | 130 +++++++++++++++++++++
 .../planner/physical/DispatchablePlanContext.java  |  20 ++++
 .../planner/physical/DispatchablePlanFragment.java |  15 +++
 .../planner/physical/DispatchableSubPlan.java      |  19 ++-
 .../planner/physical/PinotDispatchPlanner.java     |  94 ++++++++++++++-
 .../apache/pinot/query/routing/WorkerManager.java  |  13 ++-
 .../DispatchablePlanContextPruningTest.java        |  85 ++++++++++++++
 .../planner/physical/DispatchableSubPlanTest.java  |  88 ++++++++++++++
 .../planner/physical/PinotDispatchPlannerTest.java | 113 ++++++++++++++++++
 11 files changed, 703 insertions(+), 40 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index a5ad9811d60..25c268a6bac 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -133,6 +133,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
   private final WorkerManager _workerManager;
   private final WorkerManager _multiClusterWorkerManager;
+  private final MailboxService _mailboxService;
   private final QueryDispatcher _queryDispatcher;
   private final boolean _explainAskingServerDefault;
   private final MultiStageQueryThrottler _queryThrottler;
@@ -184,10 +185,10 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     boolean dispatchKeepAliveWithoutCalls = config.getProperty(
         
CommonConstants.MultiStageQueryRunner.KEY_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
+    _mailboxService = new MailboxService(hostname, port, InstanceType.BROKER, 
config, tlsConfig);
     _queryDispatcher =
-        new QueryDispatcher(new MailboxService(hostname, port, 
InstanceType.BROKER, config, tlsConfig), failureDetector,
-            tlsConfig, isQueryCancellationEnabled(), cancelTimeout, 
dispatchKeepAliveTimeMs,
-            dispatchKeepAliveTimeoutMs, dispatchKeepAliveWithoutCalls);
+        new QueryDispatcher(_mailboxService, failureDetector, tlsConfig, 
isQueryCancellationEnabled(), cancelTimeout,
+            dispatchKeepAliveTimeMs, dispatchKeepAliveTimeoutMs, 
dispatchKeepAliveWithoutCalls);
     LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: 
{} with broker id: {}, timeout: {}ms, "
             + "query log max length: {}, query log max rate: {}, query 
cancellation enabled: {}", hostname, port,
         _brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), 
_queryLogger.getLogRateLimit(),
@@ -650,6 +651,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
     }
 
+    // Short-circuit: if all leaf stages are empty (all segments pruned or 
table has no data),
+    // run only the broker reduce stage locally. No server dispatch is 
attempted.
+    boolean allLeafStagesEmpty = dispatchableSubPlan.isAllLeafStagesEmpty();
     int estimatedNumQueryThreads = 
dispatchableSubPlan.getEstimatedNumQueryThreads();
     try {
       // It's fine to block in this thread because we use a separate thread 
pool from the main Jersey server to process
@@ -668,35 +672,55 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT);
     }
 
-    int stageCount = dispatchableSubPlan.getQueryStageMap().size();
-    int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
-        .mapToInt(stage -> stage.getWorkerMetadataList().size())
-        .sum();
-
     try {
       String clientRequestId = 
extractClientRequestId(query.getSqlNodeAndOptions());
       onQueryStart(requestId, clientRequestId, query.getTextQuery());
       long executionStartTimeNs = System.nanoTime();
 
-      _stagesStartedMeter.mark(stageCount);
-      _opchainsStartedMeter.mark(opChainCount);
-
+      // All leaf stages empty means every segment was pruned (or the table 
has no data) and no
+      // server dispatch is needed. The paths differ because the normal path 
dispatches to servers,
+      // tracks stage/opchain meters, and must re-throw QueryException from 
server responses.
       QueryDispatcher.QueryResult queryResults;
-      try {
-        queryResults = _queryDispatcher.submitAndReduce(requestContext, 
dispatchableSubPlan, timer.getRemainingTimeMs(),
-            query.getOptions());
-      } catch (QueryException e) {
-        throw e;
-      } catch (Throwable t) {
-        QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
-        String consolidatedMessage = 
ExceptionUtils.consolidateExceptionTraces(t);
-        LOGGER.error("Caught exception executing request {}: {}, {}", 
requestId, query, consolidatedMessage);
-        requestContext.setErrorCode(queryErrorCode);
-        return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
-      } finally {
-        _stagesFinishedMeter.mark(stageCount);
-        _opchainsCompletedMeter.mark(opChainCount);
-        onQueryFinish(requestId);
+      if (allLeafStagesEmpty) {
+        try {
+          queryResults = QueryDispatcher.runReducer(dispatchableSubPlan, 
query.getOptions(), _mailboxService);
+        } catch (QueryException e) {
+          // Re-throw typed errors (auth, validation, etc.) so they propagate 
with their
+          // original error codes, matching the dispatch branch behavior.
+          throw e;
+        } catch (Throwable t) {
+          QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
+          String consolidatedMessage = 
ExceptionUtils.consolidateExceptionTraces(t);
+          LOGGER.error("Caught exception reducing all-leaf-empty request {}: 
{}, {}", requestId, query,
+              consolidatedMessage);
+          requestContext.setErrorCode(queryErrorCode);
+          return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
+        } finally {
+          onQueryFinish(requestId);
+        }
+      } else {
+        int stageCount = dispatchableSubPlan.getQueryStageMap().size();
+        int opChainCount = 
dispatchableSubPlan.getQueryStageMap().values().stream()
+            .mapToInt(stage -> stage.getWorkerMetadataList().size())
+            .sum();
+        _stagesStartedMeter.mark(stageCount);
+        _opchainsStartedMeter.mark(opChainCount);
+        try {
+          queryResults = _queryDispatcher.submitAndReduce(requestContext, 
dispatchableSubPlan,
+              timer.getRemainingTimeMs(), query.getOptions());
+        } catch (QueryException e) {
+          throw e;
+        } catch (Throwable t) {
+          QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
+          String consolidatedMessage = 
ExceptionUtils.consolidateExceptionTraces(t);
+          LOGGER.error("Caught exception executing request {}: {}, {}", 
requestId, query, consolidatedMessage);
+          requestContext.setErrorCode(queryErrorCode);
+          return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
+        } finally {
+          _stagesFinishedMeter.mark(stageCount);
+          _opchainsCompletedMeter.mark(opChainCount);
+          onQueryFinish(requestId);
+        }
       }
 
       BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
@@ -726,8 +750,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       // MSE cannot finish if a single queried server did not respond, so we 
can use the same count for
       // both the queried and responded stats. Minus one prevents the broker 
to be included in the count
       // (it will always be included because of the root of the query plan)
-      brokerResponse.setNumServersQueried(servers.size() - 1);
-      brokerResponse.setNumServersResponded(servers.size() - 1);
+      int numServersQueried = allLeafStagesEmpty ? 0 : servers.size() - 1;
+      brokerResponse.setNumServersQueried(numServersQueried);
+      brokerResponse.setNumServersResponded(numServersQueried);
 
       // Attach unavailable segments (unless configured to ignore missing 
segments)
       int numUnavailableSegments = 0;
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index b4b5a13b5fc..076c1f0592c 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -20,7 +20,9 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
@@ -32,27 +34,39 @@ import 
org.apache.pinot.common.failuredetector.FailureDetector;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.planner.physical.PinotDispatchPlanner;
 import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 
-public class MultiStageBrokerRequestHandlerTest {
+public class MultiStageBrokerRequestHandlerTest extends 
QueryEnvironmentTestBase {
 
   @Test
   public void testOnQueryCompletionHookReceivesBrokerResponseForMse()
@@ -116,7 +130,6 @@ public class MultiStageBrokerRequestHandlerTest {
   @Test
   public void 
testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold()
       throws Exception {
-    // When the broker config is set, the option is injected for queries that 
don't already specify it.
     MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold("5000");
 
     Map<String, String> queryOptions = new HashMap<>();
@@ -128,7 +141,6 @@ public class MultiStageBrokerRequestHandlerTest {
   @Test
   public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins()
       throws Exception {
-    // A per-query SET — including SET = 0 to disable — must take precedence 
over the broker default.
     MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold("5000");
 
     Map<String, String> queryOptions = new HashMap<>();
@@ -147,7 +159,6 @@ public class MultiStageBrokerRequestHandlerTest {
   @Test
   public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset()
       throws Exception {
-    // With the broker config unset (default -1), no option is injected.
     MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold(null);
 
     Map<String, String> queryOptions = new HashMap<>();
@@ -180,13 +191,75 @@ public class MultiStageBrokerRequestHandlerTest {
         ThreadAccountantUtils.getNoOpAccountant(), null, 
mock(WorkerManager.class), mock(WorkerManager.class)) {
       @Override
       public void start() {
-        // Skip dispatcher.start() and Calcite warmupCompile — neither is 
needed for this test.
       }
 
       @Override
       public void shutDown() {
-        // Match start() — no dispatcher was started, so there is nothing to 
shut down.
       }
     };
   }
+
+  // Timeout guards against deadlock: if rewriteReduceStageForEmptyLeaves 
fails to inline
+  // all MailboxReceiveNodes, the reducer will block forever polling a mailbox 
with no sender.
+  @Test(timeOut = 2_000)
+  public void testAllLeafStagesEmptyReducerDoesNotWaitForChildStageMailbox() {
+    DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*) 
FROM a WHERE ts < 0 LIMIT 1");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+    QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+    assertNull(queryResult.getProcessingException());
+    ResultTable resultTable = queryResult.getResultTable();
+    assertNotNull(resultTable);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    assertEquals(rows.get(0)[0], 0L);
+  }
+
+  @Test(timeOut = 2_000)
+  public void testAllLeafStagesEmptyReducerRunsPushedDownAggregates() {
+    DispatchableSubPlan subPlan = _queryEnvironment.planQuery(
+        "SELECT COUNT(*), DISTINCTCOUNT(col1), DISTINCTCOUNT(col2), SUM(col3) 
FROM a WHERE ts < 0 LIMIT 1");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+    QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+    assertNull(queryResult.getProcessingException());
+    ResultTable resultTable = queryResult.getResultTable();
+    assertNotNull(resultTable);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    assertEquals(((Number) rows.get(0)[0]).longValue(), 0L);
+    assertEquals(((Number) rows.get(0)[1]).longValue(), 0L);
+    assertEquals(((Number) rows.get(0)[2]).longValue(), 0L);
+    assertNull(rows.get(0)[3]);
+  }
+
+  @Test(timeOut = 2_000)
+  public void testAllLeafStagesEmptyReducerWithJoin() {
+    DispatchableSubPlan subPlan =
+        _queryEnvironment.planQuery("SELECT COUNT(*) FROM a JOIN b ON a.col1 = 
b.col1 WHERE a.ts < 0");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+    QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+    assertNull(queryResult.getProcessingException());
+    ResultTable resultTable = queryResult.getResultTable();
+    assertNotNull(resultTable);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    assertEquals(rows.get(0)[0], 0L);
+  }
+
+  private static QueryDispatcher.QueryResult runReducer(Map<Integer, 
DispatchablePlanFragment> fragmentMap,
+      DispatchableSubPlan originalSubPlan) {
+    // Match production: drop orphan stages after rewrite inlines them into 
stage 0.
+    fragmentMap.keySet().retainAll(Set.of(0));
+    DispatchableSubPlan rewrittenPlan = new DispatchableSubPlan(
+        originalSubPlan.getQueryResultFields(), fragmentMap, 
originalSubPlan.getTableNames(),
+        originalSubPlan.getTableToUnavailableSegmentsMap(), 
originalSubPlan.getNumSegmentsPrunedByBroker(), true);
+    try (QueryThreadContext ignored = QueryThreadContext.openForMseTest()) {
+      return QueryDispatcher.runReducer(rewrittenPlan, Map.of(), 
Mockito.mock(MailboxService.class));
+    }
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 34bc3ef93bc..025820debb3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -102,6 +102,16 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     return SCHEMA_FILE_NAME;
   }
 
+  @Override
+  protected TableConfig createOfflineTableConfig() {
+    // Enable the TimeSegmentPruner so that useBrokerPruning can eliminate 
segments based on time
+    // filters. Without this, the broker has no pruner registered and cannot 
determine that
+    // DaysSinceEpoch < 0 matches zero segments — which is required for the 
short-circuit tests.
+    TableConfig tableConfig = super.createOfflineTableConfig();
+    tableConfig.setRoutingConfig(new RoutingConfig(null, List.of("time"), 
null, null));
+    return tableConfig;
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -219,6 +229,126 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     assertEquals(joinResult, expectedResult);
   }
 
+  @Test
+  public void testAllLeafStagesEmptyBrokerResponses()
+      throws Exception {
+    String table = "mytable";
+    assertAllLeafStagesEmptyRows("SELECT AirlineID, Carrier FROM " + table + " 
WHERE DaysSinceEpoch < 0",
+        List.of(), "LONG", "STRING");
+    assertAllLeafStagesEmptyRows("SELECT COUNT(*) FROM " + table + " WHERE 
DaysSinceEpoch < 0",
+        List.of(List.<Object>of(0)), "LONG");
+    assertAllLeafStagesEmptyRows("SELECT SUM(ActualElapsedTime) FROM " + table 
+ " WHERE DaysSinceEpoch < 0",
+        List.of(Collections.singletonList(null)), "LONG");
+    assertAllLeafStagesEmptyRows("SELECT COUNT(*) + 1 FROM " + table + " WHERE 
DaysSinceEpoch < 0",
+        List.of(List.<Object>of(1)), "LONG");
+    assertAllLeafStagesEmptyRows(
+        "SELECT COALESCE(SUM(ActualElapsedTime), 0) FROM " + table + " WHERE 
DaysSinceEpoch < 0",
+        List.of(List.<Object>of(0)), "LONG");
+    assertAllLeafStagesEmptyRows("SELECT COUNT(*) FROM " + table + " WHERE 
DaysSinceEpoch < 0 HAVING COUNT(*) > 0",
+        List.of(), "LONG");
+    assertAllLeafStagesEmptyRows(
+        "SELECT AirlineID, COUNT(*) FROM " + table + " WHERE DaysSinceEpoch < 
0 GROUP BY AirlineID",
+        List.of(), "LONG", "LONG");
+    // MIN/MAX return null on empty input (not +/-INFINITY)
+    assertAllLeafStagesEmptyRows("SELECT MIN(ActualElapsedTime) FROM " + table 
+ " WHERE DaysSinceEpoch < 0",
+        List.of(Collections.singletonList(null)), "INT");
+    assertAllLeafStagesEmptyRows("SELECT MAX(ActualElapsedTime) FROM " + table 
+ " WHERE DaysSinceEpoch < 0",
+        List.of(Collections.singletonList(null)), "INT");
+    // AVG returns null on empty input
+    assertAllLeafStagesEmptyRows("SELECT AVG(ActualElapsedTime) FROM " + table 
+ " WHERE DaysSinceEpoch < 0",
+        List.of(Collections.singletonList(null)), "DOUBLE");
+    // Multi-aggregate row alignment
+    assertAllLeafStagesEmptyRows(
+        "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime), 
AVG(ActualElapsedTime), COUNT(*)"
+            + " FROM " + table + " WHERE DaysSinceEpoch < 0",
+        List.of(Arrays.asList(null, null, null, 0L)), "INT", "INT", "DOUBLE", 
"LONG");
+    // HAVING with IS NULL
+    assertAllLeafStagesEmptyRows(
+        "SELECT SUM(ActualElapsedTime) FROM " + table
+            + " WHERE DaysSinceEpoch < 0 HAVING SUM(ActualElapsedTime) IS 
NULL",
+        List.of(Collections.singletonList(null)), "LONG");
+    // Window function over empty input
+    assertAllLeafStagesEmptyRows(
+        "SELECT SUM(ActualElapsedTime) OVER () FROM " + table + " WHERE 
DaysSinceEpoch < 0",
+        List.of(), "LONG");
+  }
+
+  @Test
+  public void testReplicatedLeavesThatCanProduceRowsDoNotShortCircuit()
+      throws Exception {
+    assertDoesNotShortCircuitRows("SELECT d.dayId FROM " + DIM_TABLE
+            + " d LEFT JOIN (SELECT DayOfWeek FROM mytable WHERE 
DaysSinceEpoch < 0) f "
+            + "ON d.dayId = f.DayOfWeek ORDER BY d.dayId",
+        DIM_NUMBER_OF_RECORDS);
+    assertDoesNotShortCircuitRows("SELECT d.dayId FROM (SELECT DayOfWeek FROM 
mytable WHERE DaysSinceEpoch < 0) f "
+            + "RIGHT JOIN " + DIM_TABLE + " d ON f.DayOfWeek = d.dayId ORDER 
BY d.dayId",
+        DIM_NUMBER_OF_RECORDS);
+    assertDoesNotShortCircuitRows("SELECT d.dayId FROM (SELECT DayOfWeek FROM 
mytable WHERE DaysSinceEpoch < 0) f "
+            + "FULL JOIN " + DIM_TABLE + " d ON f.DayOfWeek = d.dayId ORDER BY 
d.dayId",
+        DIM_NUMBER_OF_RECORDS);
+    assertDoesNotShortCircuitSingleLong("SELECT COUNT(*) FROM "
+            + "(SELECT DayOfWeek FROM mytable WHERE DaysSinceEpoch < 0) f LEFT 
JOIN " + DIM_TABLE
+            + " d ON f.DayOfWeek = d.dayId",
+        0);
+    assertDoesNotShortCircuitSingleLong("SELECT COUNT(*) FROM (SELECT dayId 
FROM " + DIM_TABLE
+            + " UNION ALL SELECT DayOfWeek FROM mytable WHERE DaysSinceEpoch < 
0) u",
+        DIM_NUMBER_OF_RECORDS);
+  }
+
+  private JsonNode assertAllLeafStagesEmptyRows(String query, 
List<List<Object>> expectedRows, String... expectedTypes)
+      throws Exception {
+    JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+    assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions 
for query: " + query);
+    assertEquals(response.get("numServersQueried").asInt(), 0, "Query should 
not dispatch to servers: " + query);
+    assertEquals(response.get("numServersResponded").asInt(), 0, "Query should 
not dispatch to servers: " + query);
+
+    JsonNode resultTable = response.get("resultTable");
+    JsonNode rows = resultTable.get("rows");
+    assertEquals(rows.size(), expectedRows.size(), "Unexpected row count for 
query: " + query);
+    for (int rowId = 0; rowId < expectedRows.size(); rowId++) {
+      List<Object> expectedRow = expectedRows.get(rowId);
+      JsonNode actualRow = rows.get(rowId);
+      assertEquals(actualRow.size(), expectedRow.size(), "Unexpected column 
count for query: " + query);
+      for (int colId = 0; colId < expectedRow.size(); colId++) {
+        Object expectedValue = expectedRow.get(colId);
+        if (expectedValue == null) {
+          assertTrue(actualRow.get(colId).isNull(), "Expected null for query: 
" + query);
+        } else if (expectedValue instanceof Number) {
+          assertEquals(actualRow.get(colId).asLong(), ((Number) 
expectedValue).longValue(),
+              "Unexpected numeric value for query: " + query);
+        } else {
+          assertEquals(actualRow.get(colId).asText(), expectedValue, 
"Unexpected value for query: " + query);
+        }
+      }
+    }
+
+    JsonNode columnDataTypes = 
resultTable.get("dataSchema").get("columnDataTypes");
+    assertEquals(columnDataTypes.size(), expectedTypes.length, "Unexpected 
schema width for query: " + query);
+    for (int i = 0; i < expectedTypes.length; i++) {
+      assertEquals(columnDataTypes.get(i).asText(), expectedTypes[i], 
"Unexpected type for query: " + query);
+    }
+    return response;
+  }
+
+  private void assertDoesNotShortCircuitRows(String query, int 
expectedRowCount)
+      throws Exception {
+    JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+    assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions 
for query: " + query);
+    assertTrue(response.get("numServersQueried").asInt() > 0, "Query should 
dispatch to servers: " + query);
+    assertEquals(response.get("resultTable").get("rows").size(), 
expectedRowCount,
+        "Unexpected row count for query: " + query);
+  }
+
+  private void assertDoesNotShortCircuitSingleLong(String query, long 
expectedValue)
+      throws Exception {
+    JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+    assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions 
for query: " + query);
+    assertTrue(response.get("numServersQueried").asInt() > 0, "Query should 
dispatch to servers: " + query);
+    JsonNode rows = response.get("resultTable").get("rows");
+    assertEquals(rows.size(), 1, "Unexpected row count for query: " + query);
+    assertEquals(rows.get(0).get(0).asLong(), expectedValue, "Unexpected value 
for query: " + query);
+  }
+
   @Test
   @Override
   public void testGeneratedQueries()
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 75e82dac475..967182ff7e4 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -61,6 +61,8 @@ public class DispatchablePlanContext {
   private final Map<Integer, DispatchablePlanMetadata> 
_dispatchablePlanMetadataMap = new HashMap<>();
   private final Map<Integer, PlanNode> _dispatchablePlanStageRootMap = new 
HashMap<>();
   private long _numSegmentsPrunedByBroker;
+  private int _leafStagesAssigned;
+  private int _leafStagesEmpty;
 
 
   public DispatchablePlanContext(WorkerManager workerManager, long requestId, 
PlannerContext plannerContext,
@@ -139,6 +141,24 @@ public class DispatchablePlanContext {
     _numSegmentsPrunedByBroker += count;
   }
 
+  public void recordLeafStageAssigned() {
+    _leafStagesAssigned++;
+  }
+
+  public void recordLeafStageEmpty() {
+    _leafStagesEmpty++;
+  }
+
+  /**
+   * Returns true when at least one non-replicated leaf stage was processed 
during worker
+   * assignment, and every such leaf stage ended up with zero workers (e.g. 
all segments
+   * pruned by broker, or the table has no segments). Replicated leaves (dim 
tables) are
+   * excluded because they return early in WorkerManager before reaching the 
tracking code.
+   */
+  public boolean isAllNonReplicatedLeafStagesEmpty() {
+    return _leafStagesAssigned > 0 && _leafStagesAssigned == _leafStagesEmpty;
+  }
+
   public Map<Integer, DispatchablePlanFragment> 
constructDispatchablePlanFragmentMap(PlanFragment subPlanRoot) {
     Map<Integer, DispatchablePlanFragment> dispatchablePlanFragmentMap = 
createDispatchablePlanFragmentMap(subPlanRoot);
     for (Map.Entry<Integer, DispatchablePlanMetadata> planMetadataEntry : 
_dispatchablePlanMetadataMap.entrySet()) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
index 3fa3973d3d9..4e3798f5a1f 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
@@ -52,6 +53,20 @@ public class DispatchablePlanFragment {
     this(planFragment, new ArrayList<>(), new HashMap<>(), new HashMap<>());
   }
 
+  /**
+   * Returns a copy of {@code original} with its plan fragment root replaced 
by {@code newRoot}.
+   * Worker metadata and server-instance mapping are shallow-copied so the new 
fragment is
+   * independent of the original.
+   */
+  public static DispatchablePlanFragment copyWithRoot(DispatchablePlanFragment 
original, PlanNode newRoot) {
+    int fragmentId = original.getPlanFragment().getFragmentId();
+    return new DispatchablePlanFragment(
+        new PlanFragment(fragmentId, newRoot, List.of()),
+        new ArrayList<>(original.getWorkerMetadataList()),
+        new HashMap<>(original.getServerInstanceToWorkerIdMap()),
+        new HashMap<>(original.getCustomProperties()));
+  }
+
   public DispatchablePlanFragment(PlanFragment planFragment, 
List<WorkerMetadata> workerMetadataList,
       Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap, 
Map<String, String> customPropertyMap) {
     _planFragment = planFragment;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
index 660e7c02a06..7e1ecb2fd80 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -51,16 +52,25 @@ public class DispatchableSubPlan {
   private final Set<String> _tableNames;
   private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
   private final long _numSegmentsPrunedByBroker;
+  private final boolean _allLeafStagesEmpty;
 
   public DispatchableSubPlan(PairList<Integer, String> fields,
       Map<Integer, DispatchablePlanFragment> queryStageMap,
       Set<String> tableNames, Map<String, Set<String>> 
tableToUnavailableSegmentsMap,
       long numSegmentsPrunedByBroker) {
+    this(fields, queryStageMap, tableNames, tableToUnavailableSegmentsMap, 
numSegmentsPrunedByBroker, false);
+  }
+
+  public DispatchableSubPlan(PairList<Integer, String> fields,
+      Map<Integer, DispatchablePlanFragment> queryStageMap,
+      Set<String> tableNames, Map<String, Set<String>> 
tableToUnavailableSegmentsMap,
+      long numSegmentsPrunedByBroker, boolean allLeafStagesEmpty) {
     _queryResultFields = fields;
     _queryStageMap = queryStageMap;
     _tableNames = tableNames;
     _tableToUnavailableSegmentsMap = tableToUnavailableSegmentsMap;
     _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+    _allLeafStagesEmpty = allLeafStagesEmpty;
   }
 
   /**
@@ -68,7 +78,7 @@ public class DispatchableSubPlan {
    * @return stage plan map.
    */
   public Map<Integer, DispatchablePlanFragment> getQueryStageMap() {
-    return _queryStageMap;
+    return Collections.unmodifiableMap(_queryStageMap);
   }
 
   private static Comparator<DispatchablePlanFragment> byStageIdComparator() {
@@ -129,10 +139,17 @@ public class DispatchableSubPlan {
     return _numSegmentsPrunedByBroker;
   }
 
+  public boolean isAllLeafStagesEmpty() {
+    return _allLeafStagesEmpty;
+  }
+
   /**
    * Get the estimated total number of threads that will be spawned for this 
query (across all stages and servers).
    */
   public int getEstimatedNumQueryThreads() {
+    if (_allLeafStagesEmpty) {
+      return 0;
+    }
     int estimatedNumQueryThreads = 0;
     // Skip broker reduce root stage
     for (DispatchablePlanFragment stage : getQueryStagesWithoutRoot()) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index a1414d48d64..5e21eea6988 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.config.provider.TableCache;
@@ -28,9 +31,14 @@ import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.SubPlan;
 import 
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.validation.ArrayToMvValidationVisitor;
 import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.routing.WorkerMetadata;
 
 
 public class PinotDispatchPlanner {
@@ -120,11 +128,93 @@ public class PinotDispatchPlanner {
 
   private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment 
subPlanRoot,
       DispatchablePlanContext dispatchablePlanContext) {
+    // TODO: Physical Optimizer path does not track empty leaf stages. To 
support short-circuit,
+    //  check if all leaf TableScanMetadata have empty workerIdToSegmentsMap.
+    boolean allLeafStagesEmpty = 
dispatchablePlanContext.isAllNonReplicatedLeafStagesEmpty();
+    if (allLeafStagesEmpty && 
hasNonEmptyReplicatedLeaf(dispatchablePlanContext.getDispatchablePlanMetadataMap()))
 {
+      allLeafStagesEmpty = false;
+    }
+    Map<Integer, DispatchablePlanFragment> fragmentMap =
+        
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot);
+    if (allLeafStagesEmpty) {
+      rewriteReduceStageForEmptyLeaves(fragmentMap);
+      // Drop orphan stages so EXPLAIN PLAN and stats consumers don't see 
unreferenced entries.
+      fragmentMap.keySet().retainAll(Set.of(0));
+    }
     return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
-        
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot),
+        fragmentMap,
         dispatchablePlanContext.getTableNames(),
         
populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()),
-        dispatchablePlanContext.getNumSegmentsPrunedByBroker());
+        dispatchablePlanContext.getNumSegmentsPrunedByBroker(),
+        allLeafStagesEmpty);
+  }
+
+  static boolean hasNonEmptyReplicatedLeaf(Map<Integer, 
DispatchablePlanMetadata> metadataMap) {
+    for (DispatchablePlanMetadata metadata : metadataMap.values()) {
+      Map<String, List<String>> replicatedSegments = 
metadata.getReplicatedSegments();
+      if (replicatedSegments == null) {
+        continue;
+      }
+      for (List<String> segments : replicatedSegments.values()) {
+        if (segments != null && !segments.isEmpty()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public static void rewriteReduceStageForEmptyLeaves(Map<Integer, 
DispatchablePlanFragment> fragmentMap) {
+    DispatchablePlanFragment reduceStage = fragmentMap.get(0);
+    List<WorkerMetadata> workerMetadataList = 
reduceStage.getWorkerMetadataList();
+    if (workerMetadataList.isEmpty()) {
+      return;
+    }
+    PlanNode inlinedRoot = 
inlineAllLeafStagesEmptyInputs(reduceStage.getPlanFragment().getFragmentRoot());
+    if (inlinedRoot != reduceStage.getPlanFragment().getFragmentRoot()) {
+      // Inlined nodes originally belonged to leaf stages (ID 1, 2, etc.) but 
now execute within
+      // the broker reduce stage (ID 0). PlanFragment's constructor asserts 
that the root's stageId
+      // matches the fragmentId, so we must update all node IDs before 
wrapping in a new fragment.
+      setStageIdRecursively(inlinedRoot, 0);
+      reduceStage = DispatchablePlanFragment.copyWithRoot(reduceStage, 
inlinedRoot);
+      fragmentMap.put(0, reduceStage);
+    }
+    WorkerMetadata workerMetadata = workerMetadataList.get(0);
+    reduceStage.setWorkerMetadataList(List.of(
+        new WorkerMetadata(workerMetadata.getWorkerId(), Map.of(), 
workerMetadata.getCustomProperties())));
+  }
+
+  private static PlanNode inlineAllLeafStagesEmptyInputs(PlanNode node) {
+    if (node instanceof TableScanNode) {
+      return new ValueNode(node.getStageId(), node.getDataSchema(), 
node.getNodeHint(), List.of(), List.of());
+    }
+    if (node instanceof MailboxReceiveNode) {
+      MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) node;
+      MailboxSendNode sender = mailboxReceiveNode.getSender();
+      List<PlanNode> senderInputs = sender.getInputs();
+      Preconditions.checkState(!senderInputs.isEmpty(),
+          "MailboxSendNode (stageId=%s) has no inputs", sender.getStageId());
+      return inlineAllLeafStagesEmptyInputs(senderInputs.get(0));
+    }
+    List<PlanNode> inputs = node.getInputs();
+    if (inputs.isEmpty()) {
+      return node;
+    }
+    boolean changed = false;
+    List<PlanNode> inlinedInputs = new ArrayList<>(inputs.size());
+    for (PlanNode input : inputs) {
+      PlanNode inlinedInput = inlineAllLeafStagesEmptyInputs(input);
+      inlinedInputs.add(inlinedInput);
+      changed |= inlinedInput != input;
+    }
+    return changed ? node.withInputs(inlinedInputs) : node;
+  }
+
+  private static void setStageIdRecursively(PlanNode node, int stageId) {
+    node.setStageId(stageId);
+    for (PlanNode input : node.getInputs()) {
+      setStageIdRecursively(input, stageId);
+    }
   }
 
   private static Map<String, Set<String>> populateTableUnavailableSegments(
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 6c10f2af2b5..c658062f1ed 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -494,7 +494,7 @@ public class WorkerManager {
       String partitionKey = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
       if (partitionKey != null) {
         assignWorkersToPartitionedLeafFragment(metadata, context, 
partitionKey, tableOptions);
-        addLeafServersToContext(metadata, context);
+        updateContextForLeafStage(metadata, context);
         return;
       }
     }
@@ -504,15 +504,22 @@ public class WorkerManager {
     } else {
       assignWorkersToNonPartitionedLeafFragment(fragment, metadata, context);
     }
-    addLeafServersToContext(metadata, context);
+    updateContextForLeafStage(metadata, context);
   }
 
-  private void addLeafServersToContext(DispatchablePlanMetadata metadata, 
DispatchablePlanContext context) {
+  private void updateContextForLeafStage(DispatchablePlanMetadata metadata, 
DispatchablePlanContext context) {
     if (context.isUseLeafServerForIntermediateStage()) {
       Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = 
metadata.getWorkerIdToServerInstanceMap();
       assert workerIdToServerInstanceMap != null;
       
context.getLeafServerInstances().addAll(workerIdToServerInstanceMap.values());
     }
+    // Track empty leaf stage for short-circuit detection.
+    // The replicated path returns early above and is excluded: replicated 
leaves
+    // broadcast segments to all servers rather than populating 
workerIdToServerInstanceMap.
+    context.recordLeafStageAssigned();
+    if (metadata.getWorkerIdToServerInstanceMap().isEmpty()) {
+      context.recordLeafStageEmpty();
+    }
   }
 
   // --------------------------------------------------------------------------
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
new file mode 100644
index 00000000000..98ae6ecbaa9
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.runtime.PairList;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.context.PlannerContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class DispatchablePlanContextPruningTest {
+
+  @Test
+  public void testNoLeafStagesRecorded() {
+    DispatchablePlanContext context = createMinimalContext();
+    assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "No leaf stages → 
false");
+  }
+
+  @Test
+  public void testOneLeafFullyPruned() {
+    DispatchablePlanContext context = createMinimalContext();
+    context.recordLeafStageAssigned();
+    context.recordLeafStageEmpty();
+    assertTrue(context.isAllNonReplicatedLeafStagesEmpty(), "1 assigned, 1 
pruned → true");
+  }
+
+  @Test
+  public void testTwoLeavesJoinBothPruned() {
+    DispatchablePlanContext context = createMinimalContext();
+    context.recordLeafStageAssigned();
+    context.recordLeafStageEmpty();
+    context.recordLeafStageAssigned();
+    context.recordLeafStageEmpty();
+    assertTrue(context.isAllNonReplicatedLeafStagesEmpty(), "2 assigned, 2 
pruned → true");
+  }
+
+  @Test
+  public void testTwoLeavesOnlyOnePruned() {
+    DispatchablePlanContext context = createMinimalContext();
+    context.recordLeafStageAssigned();
+    context.recordLeafStageEmpty();
+    context.recordLeafStageAssigned();
+    // second leaf NOT pruned
+    assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "2 assigned, 1 
pruned → false");
+  }
+
+  @Test
+  public void testOneLeafNotPruned() {
+    DispatchablePlanContext context = createMinimalContext();
+    context.recordLeafStageAssigned();
+    // not pruned
+    assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "1 assigned, 0 
pruned → false");
+  }
+
+  private static DispatchablePlanContext createMinimalContext() {
+    PlannerContext plannerContext = Mockito.mock(PlannerContext.class);
+    Mockito.when(plannerContext.getOptions()).thenReturn(Map.of());
+    QueryEnvironment.Config envConfig = 
Mockito.mock(QueryEnvironment.Config.class);
+    Mockito.when(plannerContext.getEnvConfig()).thenReturn(envConfig);
+    return new DispatchablePlanContext(null, 0, plannerContext,
+        PairList.of(0, "col"), Set.of("table"));
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
new file mode 100644
index 00000000000..8f422c925fb
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.runtime.PairList;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+public class DispatchableSubPlanTest {
+
+  @Test
+  public void testIsAllLeafStagesEmptyTrue() {
+    DispatchableSubPlan plan = new DispatchableSubPlan(
+        PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+        Collections.emptyMap(), 5, true);
+    assertTrue(plan.isAllLeafStagesEmpty());
+  }
+
+  @Test
+  public void testIsAllLeafStagesEmptyFalse() {
+    DispatchableSubPlan plan = new DispatchableSubPlan(
+        PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+        Collections.emptyMap(), 3, false);
+    assertFalse(plan.isAllLeafStagesEmpty());
+  }
+
+  @Test
+  public void testIsAllLeafStagesEmptyDefaultFalse() {
+    // 5-arg constructor defaults to false
+    DispatchableSubPlan plan = new DispatchableSubPlan(
+        PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+        Collections.emptyMap(), 5);
+    assertFalse(plan.isAllLeafStagesEmpty());
+  }
+
+  @Test
+  public void testIsAllLeafStagesEmptyNoTables() {
+    // Constant expression query (no tables) — flag can still be false
+    DispatchableSubPlan plan = new DispatchableSubPlan(
+        PairList.of(0, "col1"), Collections.emptyMap(), Collections.emptySet(),
+        Collections.emptyMap(), 0, false);
+    assertFalse(plan.isAllLeafStagesEmpty());
+  }
+
+  @Test
+  public void testCopyWithRootPreservesFragmentId() {
+    ValueNode oldRoot = new ValueNode(0, new DataSchema(new String[0], new 
ColumnDataType[0]),
+        PlanNode.NodeHint.EMPTY, List.of(), List.of());
+    PlanFragment fragment = new PlanFragment(0, oldRoot, List.of());
+    DispatchablePlanFragment original = new DispatchablePlanFragment(fragment);
+
+    ValueNode newRoot = new ValueNode(0, new DataSchema(new String[0], new 
ColumnDataType[0]),
+        PlanNode.NodeHint.EMPTY, List.of(), List.of());
+    DispatchablePlanFragment copy = 
DispatchablePlanFragment.copyWithRoot(original, newRoot);
+
+    org.testng.Assert.assertEquals(copy.getPlanFragment().getFragmentId(), 0);
+    assertSame(copy.getPlanFragment().getFragmentRoot(), newRoot);
+    assertSame(original.getPlanFragment().getFragmentRoot(), oldRoot);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
new file mode 100644
index 00000000000..a394d508b0e
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotDispatchPlannerTest extends QueryEnvironmentTestBase {
+
+  @Test
+  public void testHasNonEmptyReplicatedLeafAllNull() {
+    DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+    assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0, 
metadata)));
+  }
+
+  @Test
+  public void testHasNonEmptyReplicatedLeafEmptyLists() {
+    DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+    metadata.setReplicatedSegments(Map.of("t", List.of()));
+    assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0, 
metadata)));
+  }
+
+  @Test
+  public void testHasNonEmptyReplicatedLeafNonEmpty() {
+    DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+    metadata.setReplicatedSegments(Map.of("t", List.of("seg1")));
+    assertTrue(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0, 
metadata)));
+  }
+
+  @Test
+  public void testHasNonEmptyReplicatedLeafMixed() {
+    DispatchablePlanMetadata metadataNullSegments = new 
DispatchablePlanMetadata();
+    DispatchablePlanMetadata metadataNonEmpty = new DispatchablePlanMetadata();
+    metadataNonEmpty.setReplicatedSegments(Map.of("t", List.of("seg1")));
+    Map<Integer, DispatchablePlanMetadata> metadataMap = Map.of(0, 
metadataNullSegments, 1, metadataNonEmpty);
+    assertTrue(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(metadataMap));
+  }
+
+  @Test
+  public void testHasNonEmptyReplicatedLeafEmptyMap() {
+    assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of()));
+  }
+
+  @Test
+  public void testRewriteReduceStageSetAllNodeStageIdsToZero() {
+    DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*) 
FROM a WHERE ts < 0");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+    PlanNode root = fragmentMap.get(0).getPlanFragment().getFragmentRoot();
+    assertAllStageIdsAreZero(root);
+  }
+
+  @Test
+  public void testRewriteReduceStageWithEmptyWorkerMetadataListIsNoOp() {
+    DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*) 
FROM a WHERE ts < 0");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    fragmentMap.get(0).getWorkerMetadataList().clear();
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+    assertTrue(fragmentMap.get(0).getWorkerMetadataList().isEmpty());
+  }
+
+  @Test
+  public void testRewriteReduceStageStripsMailboxInfos() {
+    DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*) 
FROM a WHERE ts < 0 LIMIT 1");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    
assertTrue(fragmentMap.get(0).getWorkerMetadataList().get(0).getMailboxInfosMap().containsKey(1));
+
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+    
assertTrue(fragmentMap.get(0).getWorkerMetadataList().get(0).getMailboxInfosMap().isEmpty());
+  }
+
+  @Test
+  public void testRewriteReduceStageWithJoinInlinesAllBranches() {
+    DispatchableSubPlan subPlan =
+        _queryEnvironment.planQuery("SELECT COUNT(*) FROM a JOIN b ON a.col1 = 
b.col1 WHERE a.ts < 0");
+    Map<Integer, DispatchablePlanFragment> fragmentMap = new 
HashMap<>(subPlan.getQueryStageMap());
+    PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+    PlanNode root = fragmentMap.get(0).getPlanFragment().getFragmentRoot();
+    assertAllStageIdsAreZero(root);
+  }
+
+  private static void assertAllStageIdsAreZero(PlanNode node) {
+    assertEquals(node.getStageId(), 0);
+    for (PlanNode input : node.getInputs()) {
+      assertAllStageIdsAreZero(input);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to