This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 46883181101 [multistage] short-circuit in broker when all segments are
pruned or table is empty (#18538)
46883181101 is described below
commit 46883181101bdb487ff968491326d575ef3cd663
Author: dang-stripe <[email protected]>
AuthorDate: Mon May 25 16:01:36 2026 -0700
[multistage] short-circuit in broker when all segments are pruned or table
is empty (#18538)
* implement broker short-circuit when all segments are pruned
* move empty-leaf rewrite from broker into PinotDispatchPlanner
* address review feedback
* enable useBrokerPruning in short-circuit integration tests
* fix integration test by querying only offline table
* enable TimeSegmentPruner in integration test table config
---
.../MultiStageBrokerRequestHandler.java | 81 ++++++++-----
.../MultiStageBrokerRequestHandlerTest.java | 85 +++++++++++++-
.../tests/MultiStageEngineIntegrationTest.java | 130 +++++++++++++++++++++
.../planner/physical/DispatchablePlanContext.java | 20 ++++
.../planner/physical/DispatchablePlanFragment.java | 15 +++
.../planner/physical/DispatchableSubPlan.java | 19 ++-
.../planner/physical/PinotDispatchPlanner.java | 94 ++++++++++++++-
.../apache/pinot/query/routing/WorkerManager.java | 13 ++-
.../DispatchablePlanContextPruningTest.java | 85 ++++++++++++++
.../planner/physical/DispatchableSubPlanTest.java | 88 ++++++++++++++
.../planner/physical/PinotDispatchPlannerTest.java | 113 ++++++++++++++++++
11 files changed, 703 insertions(+), 40 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index a5ad9811d60..25c268a6bac 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -133,6 +133,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final WorkerManager _workerManager;
private final WorkerManager _multiClusterWorkerManager;
+ private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;
private final boolean _explainAskingServerDefault;
private final MultiStageQueryThrottler _queryThrottler;
@@ -184,10 +185,10 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
boolean dispatchKeepAliveWithoutCalls = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS,
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_DISPATCH_CHANNEL_KEEP_ALIVE_WITHOUT_CALLS);
+ _mailboxService = new MailboxService(hostname, port, InstanceType.BROKER,
config, tlsConfig);
_queryDispatcher =
- new QueryDispatcher(new MailboxService(hostname, port,
InstanceType.BROKER, config, tlsConfig), failureDetector,
- tlsConfig, isQueryCancellationEnabled(), cancelTimeout,
dispatchKeepAliveTimeMs,
- dispatchKeepAliveTimeoutMs, dispatchKeepAliveWithoutCalls);
+ new QueryDispatcher(_mailboxService, failureDetector, tlsConfig,
isQueryCancellationEnabled(), cancelTimeout,
+ dispatchKeepAliveTimeMs, dispatchKeepAliveTimeoutMs,
dispatchKeepAliveWithoutCalls);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port:
{} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}, query
cancellation enabled: {}", hostname, port,
_brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(),
_queryLogger.getLogRateLimit(),
@@ -650,6 +651,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS,
errorMessage);
}
+ // Short-circuit: if all leaf stages are empty (all segments pruned or
table has no data),
+ // run only the broker reduce stage locally. No server dispatch is
attempted.
+ boolean allLeafStagesEmpty = dispatchableSubPlan.isAllLeafStagesEmpty();
int estimatedNumQueryThreads =
dispatchableSubPlan.getEstimatedNumQueryThreads();
try {
// It's fine to block in this thread because we use a separate thread
pool from the main Jersey server to process
@@ -668,35 +672,55 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT);
}
- int stageCount = dispatchableSubPlan.getQueryStageMap().size();
- int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
- .mapToInt(stage -> stage.getWorkerMetadataList().size())
- .sum();
-
try {
String clientRequestId =
extractClientRequestId(query.getSqlNodeAndOptions());
onQueryStart(requestId, clientRequestId, query.getTextQuery());
long executionStartTimeNs = System.nanoTime();
- _stagesStartedMeter.mark(stageCount);
- _opchainsStartedMeter.mark(opChainCount);
-
+ // All leaf stages empty means every segment was pruned (or the table
has no data) and no
+ // server dispatch is needed. The paths differ because the normal path
dispatches to servers,
+ // tracks stage/opchain meters, and must re-throw QueryException from
server responses.
QueryDispatcher.QueryResult queryResults;
- try {
- queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, timer.getRemainingTimeMs(),
- query.getOptions());
- } catch (QueryException e) {
- throw e;
- } catch (Throwable t) {
- QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
- String consolidatedMessage =
ExceptionUtils.consolidateExceptionTraces(t);
- LOGGER.error("Caught exception executing request {}: {}, {}",
requestId, query, consolidatedMessage);
- requestContext.setErrorCode(queryErrorCode);
- return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
- } finally {
- _stagesFinishedMeter.mark(stageCount);
- _opchainsCompletedMeter.mark(opChainCount);
- onQueryFinish(requestId);
+ if (allLeafStagesEmpty) {
+ try {
+ queryResults = QueryDispatcher.runReducer(dispatchableSubPlan,
query.getOptions(), _mailboxService);
+ } catch (QueryException e) {
+ // Re-throw typed errors (auth, validation, etc.) so they propagate
with their
+ // original error codes, matching the dispatch branch behavior.
+ throw e;
+ } catch (Throwable t) {
+ QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
+ String consolidatedMessage =
ExceptionUtils.consolidateExceptionTraces(t);
+ LOGGER.error("Caught exception reducing all-leaf-empty request {}:
{}, {}", requestId, query,
+ consolidatedMessage);
+ requestContext.setErrorCode(queryErrorCode);
+ return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
+ } finally {
+ onQueryFinish(requestId);
+ }
+ } else {
+ int stageCount = dispatchableSubPlan.getQueryStageMap().size();
+ int opChainCount =
dispatchableSubPlan.getQueryStageMap().values().stream()
+ .mapToInt(stage -> stage.getWorkerMetadataList().size())
+ .sum();
+ _stagesStartedMeter.mark(stageCount);
+ _opchainsStartedMeter.mark(opChainCount);
+ try {
+ queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan,
+ timer.getRemainingTimeMs(), query.getOptions());
+ } catch (QueryException e) {
+ throw e;
+ } catch (Throwable t) {
+ QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
+ String consolidatedMessage =
ExceptionUtils.consolidateExceptionTraces(t);
+ LOGGER.error("Caught exception executing request {}: {}, {}",
requestId, query, consolidatedMessage);
+ requestContext.setErrorCode(queryErrorCode);
+ return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
+ } finally {
+ _stagesFinishedMeter.mark(stageCount);
+ _opchainsCompletedMeter.mark(opChainCount);
+ onQueryFinish(requestId);
+ }
}
BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
@@ -726,8 +750,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
// MSE cannot finish if a single queried server did not respond, so we
can use the same count for
// both the queried and responded stats. Minus one prevents the broker
to be included in the count
// (it will always be included because of the root of the query plan)
- brokerResponse.setNumServersQueried(servers.size() - 1);
- brokerResponse.setNumServersResponded(servers.size() - 1);
+ int numServersQueried = allLeafStagesEmpty ? 0 : servers.size() - 1;
+ brokerResponse.setNumServersQueried(numServersQueried);
+ brokerResponse.setNumServersResponded(numServersQueried);
// Attach unavailable segments (unless configured to ignore missing
segments)
int numUnavailableSegments = 0;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index b4b5a13b5fc..076c1f0592c 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -20,7 +20,9 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
@@ -32,27 +34,39 @@ import
org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.planner.physical.PinotDispatchPlanner;
import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
+import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
-public class MultiStageBrokerRequestHandlerTest {
+public class MultiStageBrokerRequestHandlerTest extends
QueryEnvironmentTestBase {
@Test
public void testOnQueryCompletionHookReceivesBrokerResponseForMse()
@@ -116,7 +130,6 @@ public class MultiStageBrokerRequestHandlerTest {
@Test
public void
testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold()
throws Exception {
- // When the broker config is set, the option is injected for queries that
don't already specify it.
MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold("5000");
Map<String, String> queryOptions = new HashMap<>();
@@ -128,7 +141,6 @@ public class MultiStageBrokerRequestHandlerTest {
@Test
public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins()
throws Exception {
- // A per-query SET — including SET = 0 to disable — must take precedence
over the broker default.
MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold("5000");
Map<String, String> queryOptions = new HashMap<>();
@@ -147,7 +159,6 @@ public class MultiStageBrokerRequestHandlerTest {
@Test
public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset()
throws Exception {
- // With the broker config unset (default -1), no option is injected.
MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold(null);
Map<String, String> queryOptions = new HashMap<>();
@@ -180,13 +191,75 @@ public class MultiStageBrokerRequestHandlerTest {
ThreadAccountantUtils.getNoOpAccountant(), null,
mock(WorkerManager.class), mock(WorkerManager.class)) {
@Override
public void start() {
- // Skip dispatcher.start() and Calcite warmupCompile — neither is
needed for this test.
}
@Override
public void shutDown() {
- // Match start() — no dispatcher was started, so there is nothing to
shut down.
}
};
}
+
+ // Timeout guards against deadlock: if rewriteReduceStageForEmptyLeaves
fails to inline
+ // all MailboxReceiveNodes, the reducer will block forever polling a mailbox
with no sender.
+ @Test(timeOut = 2_000)
+ public void testAllLeafStagesEmptyReducerDoesNotWaitForChildStageMailbox() {
+ DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*)
FROM a WHERE ts < 0 LIMIT 1");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+ QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+ assertNull(queryResult.getProcessingException());
+ ResultTable resultTable = queryResult.getResultTable();
+ assertNotNull(resultTable);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0)[0], 0L);
+ }
+
+ @Test(timeOut = 2_000)
+ public void testAllLeafStagesEmptyReducerRunsPushedDownAggregates() {
+ DispatchableSubPlan subPlan = _queryEnvironment.planQuery(
+ "SELECT COUNT(*), DISTINCTCOUNT(col1), DISTINCTCOUNT(col2), SUM(col3)
FROM a WHERE ts < 0 LIMIT 1");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+ QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+ assertNull(queryResult.getProcessingException());
+ ResultTable resultTable = queryResult.getResultTable();
+ assertNotNull(resultTable);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 1);
+ assertEquals(((Number) rows.get(0)[0]).longValue(), 0L);
+ assertEquals(((Number) rows.get(0)[1]).longValue(), 0L);
+ assertEquals(((Number) rows.get(0)[2]).longValue(), 0L);
+ assertNull(rows.get(0)[3]);
+ }
+
+ @Test(timeOut = 2_000)
+ public void testAllLeafStagesEmptyReducerWithJoin() {
+ DispatchableSubPlan subPlan =
+ _queryEnvironment.planQuery("SELECT COUNT(*) FROM a JOIN b ON a.col1 =
b.col1 WHERE a.ts < 0");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
+ QueryDispatcher.QueryResult queryResult = runReducer(fragmentMap, subPlan);
+ assertNull(queryResult.getProcessingException());
+ ResultTable resultTable = queryResult.getResultTable();
+ assertNotNull(resultTable);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0)[0], 0L);
+ }
+
+ private static QueryDispatcher.QueryResult runReducer(Map<Integer,
DispatchablePlanFragment> fragmentMap,
+ DispatchableSubPlan originalSubPlan) {
+ // Match production: drop orphan stages after rewrite inlines them into
stage 0.
+ fragmentMap.keySet().retainAll(Set.of(0));
+ DispatchableSubPlan rewrittenPlan = new DispatchableSubPlan(
+ originalSubPlan.getQueryResultFields(), fragmentMap,
originalSubPlan.getTableNames(),
+ originalSubPlan.getTableToUnavailableSegmentsMap(),
originalSubPlan.getNumSegmentsPrunedByBroker(), true);
+ try (QueryThreadContext ignored = QueryThreadContext.openForMseTest()) {
+ return QueryDispatcher.runReducer(rewrittenPlan, Map.of(),
Mockito.mock(MailboxService.class));
+ }
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 34bc3ef93bc..025820debb3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -102,6 +102,16 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
return SCHEMA_FILE_NAME;
}
+ @Override
+ protected TableConfig createOfflineTableConfig() {
+ // Enable the TimeSegmentPruner so that useBrokerPruning can eliminate
segments based on time
+ // filters. Without this, the broker has no pruner registered and cannot
determine that
+ // DaysSinceEpoch < 0 matches zero segments — which is required for the
short-circuit tests.
+ TableConfig tableConfig = super.createOfflineTableConfig();
+ tableConfig.setRoutingConfig(new RoutingConfig(null, List.of("time"),
null, null));
+ return tableConfig;
+ }
+
@BeforeClass
public void setUp()
throws Exception {
@@ -219,6 +229,126 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTestS
assertEquals(joinResult, expectedResult);
}
+ @Test
+ public void testAllLeafStagesEmptyBrokerResponses()
+ throws Exception {
+ String table = "mytable";
+ assertAllLeafStagesEmptyRows("SELECT AirlineID, Carrier FROM " + table + "
WHERE DaysSinceEpoch < 0",
+ List.of(), "LONG", "STRING");
+ assertAllLeafStagesEmptyRows("SELECT COUNT(*) FROM " + table + " WHERE
DaysSinceEpoch < 0",
+ List.of(List.<Object>of(0)), "LONG");
+ assertAllLeafStagesEmptyRows("SELECT SUM(ActualElapsedTime) FROM " + table
+ " WHERE DaysSinceEpoch < 0",
+ List.of(Collections.singletonList(null)), "LONG");
+ assertAllLeafStagesEmptyRows("SELECT COUNT(*) + 1 FROM " + table + " WHERE
DaysSinceEpoch < 0",
+ List.of(List.<Object>of(1)), "LONG");
+ assertAllLeafStagesEmptyRows(
+ "SELECT COALESCE(SUM(ActualElapsedTime), 0) FROM " + table + " WHERE
DaysSinceEpoch < 0",
+ List.of(List.<Object>of(0)), "LONG");
+ assertAllLeafStagesEmptyRows("SELECT COUNT(*) FROM " + table + " WHERE
DaysSinceEpoch < 0 HAVING COUNT(*) > 0",
+ List.of(), "LONG");
+ assertAllLeafStagesEmptyRows(
+ "SELECT AirlineID, COUNT(*) FROM " + table + " WHERE DaysSinceEpoch <
0 GROUP BY AirlineID",
+ List.of(), "LONG", "LONG");
+ // MIN/MAX return null on empty input (not +/-INFINITY)
+ assertAllLeafStagesEmptyRows("SELECT MIN(ActualElapsedTime) FROM " + table
+ " WHERE DaysSinceEpoch < 0",
+ List.of(Collections.singletonList(null)), "INT");
+ assertAllLeafStagesEmptyRows("SELECT MAX(ActualElapsedTime) FROM " + table
+ " WHERE DaysSinceEpoch < 0",
+ List.of(Collections.singletonList(null)), "INT");
+ // AVG returns null on empty input
+ assertAllLeafStagesEmptyRows("SELECT AVG(ActualElapsedTime) FROM " + table
+ " WHERE DaysSinceEpoch < 0",
+ List.of(Collections.singletonList(null)), "DOUBLE");
+ // Multi-aggregate row alignment
+ assertAllLeafStagesEmptyRows(
+ "SELECT MIN(ActualElapsedTime), MAX(ActualElapsedTime),
AVG(ActualElapsedTime), COUNT(*)"
+ + " FROM " + table + " WHERE DaysSinceEpoch < 0",
+ List.of(Arrays.asList(null, null, null, 0L)), "INT", "INT", "DOUBLE",
"LONG");
+ // HAVING with IS NULL
+ assertAllLeafStagesEmptyRows(
+ "SELECT SUM(ActualElapsedTime) FROM " + table
+ + " WHERE DaysSinceEpoch < 0 HAVING SUM(ActualElapsedTime) IS
NULL",
+ List.of(Collections.singletonList(null)), "LONG");
+ // Window function over empty input
+ assertAllLeafStagesEmptyRows(
+ "SELECT SUM(ActualElapsedTime) OVER () FROM " + table + " WHERE
DaysSinceEpoch < 0",
+ List.of(), "LONG");
+ }
+
+ @Test
+ public void testReplicatedLeavesThatCanProduceRowsDoNotShortCircuit()
+ throws Exception {
+ assertDoesNotShortCircuitRows("SELECT d.dayId FROM " + DIM_TABLE
+ + " d LEFT JOIN (SELECT DayOfWeek FROM mytable WHERE
DaysSinceEpoch < 0) f "
+ + "ON d.dayId = f.DayOfWeek ORDER BY d.dayId",
+ DIM_NUMBER_OF_RECORDS);
+ assertDoesNotShortCircuitRows("SELECT d.dayId FROM (SELECT DayOfWeek FROM
mytable WHERE DaysSinceEpoch < 0) f "
+ + "RIGHT JOIN " + DIM_TABLE + " d ON f.DayOfWeek = d.dayId ORDER
BY d.dayId",
+ DIM_NUMBER_OF_RECORDS);
+ assertDoesNotShortCircuitRows("SELECT d.dayId FROM (SELECT DayOfWeek FROM
mytable WHERE DaysSinceEpoch < 0) f "
+ + "FULL JOIN " + DIM_TABLE + " d ON f.DayOfWeek = d.dayId ORDER BY
d.dayId",
+ DIM_NUMBER_OF_RECORDS);
+ assertDoesNotShortCircuitSingleLong("SELECT COUNT(*) FROM "
+ + "(SELECT DayOfWeek FROM mytable WHERE DaysSinceEpoch < 0) f LEFT
JOIN " + DIM_TABLE
+ + " d ON f.DayOfWeek = d.dayId",
+ 0);
+ assertDoesNotShortCircuitSingleLong("SELECT COUNT(*) FROM (SELECT dayId
FROM " + DIM_TABLE
+ + " UNION ALL SELECT DayOfWeek FROM mytable WHERE DaysSinceEpoch <
0) u",
+ DIM_NUMBER_OF_RECORDS);
+ }
+
+ private JsonNode assertAllLeafStagesEmptyRows(String query,
List<List<Object>> expectedRows, String... expectedTypes)
+ throws Exception {
+ JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+ assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions
for query: " + query);
+ assertEquals(response.get("numServersQueried").asInt(), 0, "Query should
not dispatch to servers: " + query);
+ assertEquals(response.get("numServersResponded").asInt(), 0, "Query should
not dispatch to servers: " + query);
+
+ JsonNode resultTable = response.get("resultTable");
+ JsonNode rows = resultTable.get("rows");
+ assertEquals(rows.size(), expectedRows.size(), "Unexpected row count for
query: " + query);
+ for (int rowId = 0; rowId < expectedRows.size(); rowId++) {
+ List<Object> expectedRow = expectedRows.get(rowId);
+ JsonNode actualRow = rows.get(rowId);
+ assertEquals(actualRow.size(), expectedRow.size(), "Unexpected column
count for query: " + query);
+ for (int colId = 0; colId < expectedRow.size(); colId++) {
+ Object expectedValue = expectedRow.get(colId);
+ if (expectedValue == null) {
+ assertTrue(actualRow.get(colId).isNull(), "Expected null for query:
" + query);
+ } else if (expectedValue instanceof Number) {
+ assertEquals(actualRow.get(colId).asLong(), ((Number)
expectedValue).longValue(),
+ "Unexpected numeric value for query: " + query);
+ } else {
+ assertEquals(actualRow.get(colId).asText(), expectedValue,
"Unexpected value for query: " + query);
+ }
+ }
+ }
+
+ JsonNode columnDataTypes =
resultTable.get("dataSchema").get("columnDataTypes");
+ assertEquals(columnDataTypes.size(), expectedTypes.length, "Unexpected
schema width for query: " + query);
+ for (int i = 0; i < expectedTypes.length; i++) {
+ assertEquals(columnDataTypes.get(i).asText(), expectedTypes[i],
"Unexpected type for query: " + query);
+ }
+ return response;
+ }
+
+ private void assertDoesNotShortCircuitRows(String query, int
expectedRowCount)
+ throws Exception {
+ JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+ assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions
for query: " + query);
+ assertTrue(response.get("numServersQueried").asInt() > 0, "Query should
dispatch to servers: " + query);
+ assertEquals(response.get("resultTable").get("rows").size(),
expectedRowCount,
+ "Unexpected row count for query: " + query);
+ }
+
+ private void assertDoesNotShortCircuitSingleLong(String query, long
expectedValue)
+ throws Exception {
+ JsonNode response = postQuery("SET useBrokerPruning = 'true'; " + query);
+ assertTrue(response.get("exceptions").isEmpty(), "Unexpected exceptions
for query: " + query);
+ assertTrue(response.get("numServersQueried").asInt() > 0, "Query should
dispatch to servers: " + query);
+ JsonNode rows = response.get("resultTable").get("rows");
+ assertEquals(rows.size(), 1, "Unexpected row count for query: " + query);
+ assertEquals(rows.get(0).get(0).asLong(), expectedValue, "Unexpected value
for query: " + query);
+ }
+
@Test
@Override
public void testGeneratedQueries()
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 75e82dac475..967182ff7e4 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -61,6 +61,8 @@ public class DispatchablePlanContext {
private final Map<Integer, DispatchablePlanMetadata>
_dispatchablePlanMetadataMap = new HashMap<>();
private final Map<Integer, PlanNode> _dispatchablePlanStageRootMap = new
HashMap<>();
private long _numSegmentsPrunedByBroker;
+ private int _leafStagesAssigned;
+ private int _leafStagesEmpty;
public DispatchablePlanContext(WorkerManager workerManager, long requestId,
PlannerContext plannerContext,
@@ -139,6 +141,24 @@ public class DispatchablePlanContext {
_numSegmentsPrunedByBroker += count;
}
+ public void recordLeafStageAssigned() {
+ _leafStagesAssigned++;
+ }
+
+ public void recordLeafStageEmpty() {
+ _leafStagesEmpty++;
+ }
+
+ /**
+ * Returns true when at least one non-replicated leaf stage was processed
during worker
+ * assignment, and every such leaf stage ended up with zero workers (e.g.
all segments
+ * pruned by broker, or the table has no segments). Replicated leaves (dim
tables) are
+ * excluded because they return early in WorkerManager before reaching the
tracking code.
+ */
+ public boolean isAllNonReplicatedLeafStagesEmpty() {
+ return _leafStagesAssigned > 0 && _leafStagesAssigned == _leafStagesEmpty;
+ }
+
public Map<Integer, DispatchablePlanFragment>
constructDispatchablePlanFragmentMap(PlanFragment subPlanRoot) {
Map<Integer, DispatchablePlanFragment> dispatchablePlanFragmentMap =
createDispatchablePlanFragmentMap(subPlanRoot);
for (Map.Entry<Integer, DispatchablePlanMetadata> planMetadataEntry :
_dispatchablePlanMetadataMap.entrySet()) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
index 3fa3973d3d9..4e3798f5a1f 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanFragment.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -52,6 +53,20 @@ public class DispatchablePlanFragment {
this(planFragment, new ArrayList<>(), new HashMap<>(), new HashMap<>());
}
+ /**
+ * Returns a copy of {@code original} with its plan fragment root replaced
by {@code newRoot}.
+ * Worker metadata and server-instance mapping are shallow-copied so the new
fragment is
+ * independent of the original.
+ */
+ public static DispatchablePlanFragment copyWithRoot(DispatchablePlanFragment
original, PlanNode newRoot) {
+ int fragmentId = original.getPlanFragment().getFragmentId();
+ return new DispatchablePlanFragment(
+ new PlanFragment(fragmentId, newRoot, List.of()),
+ new ArrayList<>(original.getWorkerMetadataList()),
+ new HashMap<>(original.getServerInstanceToWorkerIdMap()),
+ new HashMap<>(original.getCustomProperties()));
+ }
+
public DispatchablePlanFragment(PlanFragment planFragment,
List<WorkerMetadata> workerMetadataList,
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap,
Map<String, String> customPropertyMap) {
_planFragment = planFragment;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
index 660e7c02a06..7e1ecb2fd80 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.planner.physical;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -51,16 +52,25 @@ public class DispatchableSubPlan {
private final Set<String> _tableNames;
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
private final long _numSegmentsPrunedByBroker;
+ private final boolean _allLeafStagesEmpty;
public DispatchableSubPlan(PairList<Integer, String> fields,
Map<Integer, DispatchablePlanFragment> queryStageMap,
Set<String> tableNames, Map<String, Set<String>>
tableToUnavailableSegmentsMap,
long numSegmentsPrunedByBroker) {
+ this(fields, queryStageMap, tableNames, tableToUnavailableSegmentsMap,
numSegmentsPrunedByBroker, false);
+ }
+
+ public DispatchableSubPlan(PairList<Integer, String> fields,
+ Map<Integer, DispatchablePlanFragment> queryStageMap,
+ Set<String> tableNames, Map<String, Set<String>>
tableToUnavailableSegmentsMap,
+ long numSegmentsPrunedByBroker, boolean allLeafStagesEmpty) {
_queryResultFields = fields;
_queryStageMap = queryStageMap;
_tableNames = tableNames;
_tableToUnavailableSegmentsMap = tableToUnavailableSegmentsMap;
_numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+ _allLeafStagesEmpty = allLeafStagesEmpty;
}
/**
@@ -68,7 +78,7 @@ public class DispatchableSubPlan {
* @return stage plan map.
*/
public Map<Integer, DispatchablePlanFragment> getQueryStageMap() {
- return _queryStageMap;
+ return Collections.unmodifiableMap(_queryStageMap);
}
private static Comparator<DispatchablePlanFragment> byStageIdComparator() {
@@ -129,10 +139,17 @@ public class DispatchableSubPlan {
return _numSegmentsPrunedByBroker;
}
+ public boolean isAllLeafStagesEmpty() {
+ return _allLeafStagesEmpty;
+ }
+
/**
* Get the estimated total number of threads that will be spawned for this
query (across all stages and servers).
*/
public int getEstimatedNumQueryThreads() {
+ if (_allLeafStagesEmpty) {
+ return 0;
+ }
int estimatedNumQueryThreads = 0;
// Skip broker reduce root stage
for (DispatchablePlanFragment stage : getQueryStagesWithoutRoot()) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index a1414d48d64..5e21eea6988 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.query.planner.physical;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.config.provider.TableCache;
@@ -28,9 +31,14 @@ import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
import
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.validation.ArrayToMvValidationVisitor;
import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.routing.WorkerMetadata;
public class PinotDispatchPlanner {
@@ -120,11 +128,93 @@ public class PinotDispatchPlanner {
private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment
subPlanRoot,
DispatchablePlanContext dispatchablePlanContext) {
+ // TODO: Physical Optimizer path does not track empty leaf stages. To
support short-circuit,
+ // check if all leaf TableScanMetadata have empty workerIdToSegmentsMap.
+ boolean allLeafStagesEmpty =
dispatchablePlanContext.isAllNonReplicatedLeafStagesEmpty();
+ if (allLeafStagesEmpty &&
hasNonEmptyReplicatedLeaf(dispatchablePlanContext.getDispatchablePlanMetadataMap()))
{
+ allLeafStagesEmpty = false;
+ }
+ Map<Integer, DispatchablePlanFragment> fragmentMap =
+
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot);
+ if (allLeafStagesEmpty) {
+ rewriteReduceStageForEmptyLeaves(fragmentMap);
+ // Drop orphan stages so EXPLAIN PLAN and stats consumers don't see
unreferenced entries.
+ fragmentMap.keySet().retainAll(Set.of(0));
+ }
return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
-
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot),
+ fragmentMap,
dispatchablePlanContext.getTableNames(),
populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()),
- dispatchablePlanContext.getNumSegmentsPrunedByBroker());
+ dispatchablePlanContext.getNumSegmentsPrunedByBroker(),
+ allLeafStagesEmpty);
+ }
+
+ static boolean hasNonEmptyReplicatedLeaf(Map<Integer,
DispatchablePlanMetadata> metadataMap) {
+ for (DispatchablePlanMetadata metadata : metadataMap.values()) {
+ Map<String, List<String>> replicatedSegments =
metadata.getReplicatedSegments();
+ if (replicatedSegments == null) {
+ continue;
+ }
+ for (List<String> segments : replicatedSegments.values()) {
+ if (segments != null && !segments.isEmpty()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static void rewriteReduceStageForEmptyLeaves(Map<Integer,
DispatchablePlanFragment> fragmentMap) {
+ DispatchablePlanFragment reduceStage = fragmentMap.get(0);
+ List<WorkerMetadata> workerMetadataList =
reduceStage.getWorkerMetadataList();
+ if (workerMetadataList.isEmpty()) {
+ return;
+ }
+ PlanNode inlinedRoot =
inlineAllLeafStagesEmptyInputs(reduceStage.getPlanFragment().getFragmentRoot());
+ if (inlinedRoot != reduceStage.getPlanFragment().getFragmentRoot()) {
+ // Inlined nodes originally belonged to leaf stages (ID 1, 2, etc.) but
now execute within
+ // the broker reduce stage (ID 0). PlanFragment's constructor asserts
that the root's stageId
+ // matches the fragmentId, so we must update all node IDs before
wrapping in a new fragment.
+ setStageIdRecursively(inlinedRoot, 0);
+ reduceStage = DispatchablePlanFragment.copyWithRoot(reduceStage,
inlinedRoot);
+ fragmentMap.put(0, reduceStage);
+ }
+ WorkerMetadata workerMetadata = workerMetadataList.get(0);
+ reduceStage.setWorkerMetadataList(List.of(
+ new WorkerMetadata(workerMetadata.getWorkerId(), Map.of(),
workerMetadata.getCustomProperties())));
+ }
+
+ private static PlanNode inlineAllLeafStagesEmptyInputs(PlanNode node) {
+ if (node instanceof TableScanNode) {
+ return new ValueNode(node.getStageId(), node.getDataSchema(),
node.getNodeHint(), List.of(), List.of());
+ }
+ if (node instanceof MailboxReceiveNode) {
+ MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) node;
+ MailboxSendNode sender = mailboxReceiveNode.getSender();
+ List<PlanNode> senderInputs = sender.getInputs();
+ Preconditions.checkState(!senderInputs.isEmpty(),
+ "MailboxSendNode (stageId=%s) has no inputs", sender.getStageId());
+ return inlineAllLeafStagesEmptyInputs(senderInputs.get(0));
+ }
+ List<PlanNode> inputs = node.getInputs();
+ if (inputs.isEmpty()) {
+ return node;
+ }
+ boolean changed = false;
+ List<PlanNode> inlinedInputs = new ArrayList<>(inputs.size());
+ for (PlanNode input : inputs) {
+ PlanNode inlinedInput = inlineAllLeafStagesEmptyInputs(input);
+ inlinedInputs.add(inlinedInput);
+ changed |= inlinedInput != input;
+ }
+ return changed ? node.withInputs(inlinedInputs) : node;
+ }
+
+ private static void setStageIdRecursively(PlanNode node, int stageId) {
+ node.setStageId(stageId);
+ for (PlanNode input : node.getInputs()) {
+ setStageIdRecursively(input, stageId);
+ }
}
private static Map<String, Set<String>> populateTableUnavailableSegments(
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 6c10f2af2b5..c658062f1ed 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -494,7 +494,7 @@ public class WorkerManager {
String partitionKey =
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
if (partitionKey != null) {
assignWorkersToPartitionedLeafFragment(metadata, context,
partitionKey, tableOptions);
- addLeafServersToContext(metadata, context);
+ updateContextForLeafStage(metadata, context);
return;
}
}
@@ -504,15 +504,22 @@ public class WorkerManager {
} else {
assignWorkersToNonPartitionedLeafFragment(fragment, metadata, context);
}
- addLeafServersToContext(metadata, context);
+ updateContextForLeafStage(metadata, context);
}
- private void addLeafServersToContext(DispatchablePlanMetadata metadata,
DispatchablePlanContext context) {
+ private void updateContextForLeafStage(DispatchablePlanMetadata metadata,
DispatchablePlanContext context) {
if (context.isUseLeafServerForIntermediateStage()) {
Map<Integer, QueryServerInstance> workerIdToServerInstanceMap =
metadata.getWorkerIdToServerInstanceMap();
assert workerIdToServerInstanceMap != null;
context.getLeafServerInstances().addAll(workerIdToServerInstanceMap.values());
}
+ // Track empty leaf stage for short-circuit detection.
+ // The replicated path returns early above and is excluded: replicated
leaves
+ // broadcast segments to all servers rather than populating
workerIdToServerInstanceMap.
+ context.recordLeafStageAssigned();
+ if (metadata.getWorkerIdToServerInstanceMap().isEmpty()) {
+ context.recordLeafStageEmpty();
+ }
}
// --------------------------------------------------------------------------
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
new file mode 100644
index 00000000000..98ae6ecbaa9
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchablePlanContextPruningTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.calcite.runtime.PairList;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.context.PlannerContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class DispatchablePlanContextPruningTest {
+
+ @Test
+ public void testNoLeafStagesRecorded() {
+ DispatchablePlanContext context = createMinimalContext();
+ assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "No leaf stages →
false");
+ }
+
+ @Test
+ public void testOneLeafFullyPruned() {
+ DispatchablePlanContext context = createMinimalContext();
+ context.recordLeafStageAssigned();
+ context.recordLeafStageEmpty();
+ assertTrue(context.isAllNonReplicatedLeafStagesEmpty(), "1 assigned, 1
pruned → true");
+ }
+
+ @Test
+ public void testTwoLeavesJoinBothPruned() {
+ DispatchablePlanContext context = createMinimalContext();
+ context.recordLeafStageAssigned();
+ context.recordLeafStageEmpty();
+ context.recordLeafStageAssigned();
+ context.recordLeafStageEmpty();
+ assertTrue(context.isAllNonReplicatedLeafStagesEmpty(), "2 assigned, 2
pruned → true");
+ }
+
+ @Test
+ public void testTwoLeavesOnlyOnePruned() {
+ DispatchablePlanContext context = createMinimalContext();
+ context.recordLeafStageAssigned();
+ context.recordLeafStageEmpty();
+ context.recordLeafStageAssigned();
+ // second leaf NOT pruned
+ assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "2 assigned, 1
pruned → false");
+ }
+
+ @Test
+ public void testOneLeafNotPruned() {
+ DispatchablePlanContext context = createMinimalContext();
+ context.recordLeafStageAssigned();
+ // not pruned
+ assertFalse(context.isAllNonReplicatedLeafStagesEmpty(), "1 assigned, 0
pruned → false");
+ }
+
+ private static DispatchablePlanContext createMinimalContext() {
+ PlannerContext plannerContext = Mockito.mock(PlannerContext.class);
+ Mockito.when(plannerContext.getOptions()).thenReturn(Map.of());
+ QueryEnvironment.Config envConfig =
Mockito.mock(QueryEnvironment.Config.class);
+ Mockito.when(plannerContext.getEnvConfig()).thenReturn(envConfig);
+ return new DispatchablePlanContext(null, 0, plannerContext,
+ PairList.of(0, "col"), Set.of("table"));
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
new file mode 100644
index 00000000000..8f422c925fb
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/DispatchableSubPlanTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.runtime.PairList;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.PlanFragment;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+
+public class DispatchableSubPlanTest {
+
+ @Test
+ public void testIsAllLeafStagesEmptyTrue() {
+ DispatchableSubPlan plan = new DispatchableSubPlan(
+ PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+ Collections.emptyMap(), 5, true);
+ assertTrue(plan.isAllLeafStagesEmpty());
+ }
+
+ @Test
+ public void testIsAllLeafStagesEmptyFalse() {
+ DispatchableSubPlan plan = new DispatchableSubPlan(
+ PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+ Collections.emptyMap(), 3, false);
+ assertFalse(plan.isAllLeafStagesEmpty());
+ }
+
+ @Test
+ public void testIsAllLeafStagesEmptyDefaultFalse() {
+ // 5-arg constructor defaults to false
+ DispatchableSubPlan plan = new DispatchableSubPlan(
+ PairList.of(0, "col1"), Collections.emptyMap(), Set.of("testTable"),
+ Collections.emptyMap(), 5);
+ assertFalse(plan.isAllLeafStagesEmpty());
+ }
+
+ @Test
+ public void testIsAllLeafStagesEmptyNoTables() {
+ // Constant expression query (no tables) — flag can still be false
+ DispatchableSubPlan plan = new DispatchableSubPlan(
+ PairList.of(0, "col1"), Collections.emptyMap(), Collections.emptySet(),
+ Collections.emptyMap(), 0, false);
+ assertFalse(plan.isAllLeafStagesEmpty());
+ }
+
+ @Test
+ public void testCopyWithRootPreservesFragmentId() {
+ ValueNode oldRoot = new ValueNode(0, new DataSchema(new String[0], new
ColumnDataType[0]),
+ PlanNode.NodeHint.EMPTY, List.of(), List.of());
+ PlanFragment fragment = new PlanFragment(0, oldRoot, List.of());
+ DispatchablePlanFragment original = new DispatchablePlanFragment(fragment);
+
+ ValueNode newRoot = new ValueNode(0, new DataSchema(new String[0], new
ColumnDataType[0]),
+ PlanNode.NodeHint.EMPTY, List.of(), List.of());
+ DispatchablePlanFragment copy =
DispatchablePlanFragment.copyWithRoot(original, newRoot);
+
+ org.testng.Assert.assertEquals(copy.getPlanFragment().getFragmentId(), 0);
+ assertSame(copy.getPlanFragment().getFragmentRoot(), newRoot);
+ assertSame(original.getPlanFragment().getFragmentRoot(), oldRoot);
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
new file mode 100644
index 00000000000..a394d508b0e
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/PinotDispatchPlannerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotDispatchPlannerTest extends QueryEnvironmentTestBase {
+
+ @Test
+ public void testHasNonEmptyReplicatedLeafAllNull() {
+ DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+ assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0,
metadata)));
+ }
+
+ @Test
+ public void testHasNonEmptyReplicatedLeafEmptyLists() {
+ DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+ metadata.setReplicatedSegments(Map.of("t", List.of()));
+ assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0,
metadata)));
+ }
+
+ @Test
+ public void testHasNonEmptyReplicatedLeafNonEmpty() {
+ DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+ metadata.setReplicatedSegments(Map.of("t", List.of("seg1")));
+ assertTrue(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of(0,
metadata)));
+ }
+
+ @Test
+ public void testHasNonEmptyReplicatedLeafMixed() {
+ DispatchablePlanMetadata metadataNullSegments = new
DispatchablePlanMetadata();
+ DispatchablePlanMetadata metadataNonEmpty = new DispatchablePlanMetadata();
+ metadataNonEmpty.setReplicatedSegments(Map.of("t", List.of("seg1")));
+ Map<Integer, DispatchablePlanMetadata> metadataMap = Map.of(0,
metadataNullSegments, 1, metadataNonEmpty);
+ assertTrue(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(metadataMap));
+ }
+
+ @Test
+ public void testHasNonEmptyReplicatedLeafEmptyMap() {
+ assertFalse(PinotDispatchPlanner.hasNonEmptyReplicatedLeaf(Map.of()));
+ }
+
+ @Test
+ public void testRewriteReduceStageSetAllNodeStageIdsToZero() {
+ DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*)
FROM a WHERE ts < 0");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+ PlanNode root = fragmentMap.get(0).getPlanFragment().getFragmentRoot();
+ assertAllStageIdsAreZero(root);
+ }
+
+ @Test
+ public void testRewriteReduceStageWithEmptyWorkerMetadataListIsNoOp() {
+ DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*)
FROM a WHERE ts < 0");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ fragmentMap.get(0).getWorkerMetadataList().clear();
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+ assertTrue(fragmentMap.get(0).getWorkerMetadataList().isEmpty());
+ }
+
+ @Test
+ public void testRewriteReduceStageStripsMailboxInfos() {
+ DispatchableSubPlan subPlan = _queryEnvironment.planQuery("SELECT COUNT(*)
FROM a WHERE ts < 0 LIMIT 1");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+
assertTrue(fragmentMap.get(0).getWorkerMetadataList().get(0).getMailboxInfosMap().containsKey(1));
+
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+
assertTrue(fragmentMap.get(0).getWorkerMetadataList().get(0).getMailboxInfosMap().isEmpty());
+ }
+
+ @Test
+ public void testRewriteReduceStageWithJoinInlinesAllBranches() {
+ DispatchableSubPlan subPlan =
+ _queryEnvironment.planQuery("SELECT COUNT(*) FROM a JOIN b ON a.col1 =
b.col1 WHERE a.ts < 0");
+ Map<Integer, DispatchablePlanFragment> fragmentMap = new
HashMap<>(subPlan.getQueryStageMap());
+ PinotDispatchPlanner.rewriteReduceStageForEmptyLeaves(fragmentMap);
+ PlanNode root = fragmentMap.get(0).getPlanFragment().getFragmentRoot();
+ assertAllStageIdsAreZero(root);
+ }
+
+ private static void assertAllStageIdsAreZero(PlanNode node) {
+ assertEquals(node.getStageId(), 0);
+ for (PlanNode input : node.getInputs()) {
+ assertAllStageIdsAreZero(input);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]