This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37fe83751c8 Fix bug with 'useLeafServerForIntermediateStage' when used
with empty tables (#17634)
37fe83751c8 is described below
commit 37fe83751c8a43897d81a035a93e3b1772d12aed
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Feb 4 16:07:29 2026 -0800
Fix bug with 'useLeafServerForIntermediateStage' when used with empty
tables (#17634)
---
.../planner/physical/DispatchablePlanContext.java | 3 +
.../apache/pinot/query/routing/WorkerManager.java | 18 +-
.../pinot/query/routing/WorkerManagerTest.java | 195 +++++++++++++++++++++
3 files changed, 214 insertions(+), 2 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 61f2bcd9ed6..fe89e51a151 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.context.PlannerContext;
@@ -52,7 +53,9 @@ public class DispatchablePlanContext {
private final PairList<Integer, String> _resultFields;
private final Set<String> _tableNames;
+ @Nullable
private final Set<String> _nonLookupTables;
+ @Nullable
private final Set<QueryServerInstance> _leafServerInstances;
private final Map<Integer, DispatchablePlanMetadata>
_dispatchablePlanMetadataMap = new HashMap<>();
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 34867ef2aff..7446bb2e88f 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -361,8 +361,22 @@ public class WorkerManager {
List<QueryServerInstance> candidateServers;
if (context.isUseLeafServerForIntermediateStage()) {
Set<QueryServerInstance> leafServerInstances =
context.getLeafServerInstances();
- assert !leafServerInstances.isEmpty();
- candidateServers = new ArrayList<>(leafServerInstances);
+ if (leafServerInstances.isEmpty()) {
+ // Fall back to use all enabled servers if no leaf server is found
(e.g., when querying an empty table).
+ LOGGER.warn("[RequestId: {}] No leaf server found with
useLeafServerForIntermediateStage enabled, "
+ + "falling back to all enabled servers", context.getRequestId());
+ Map<String, ServerInstance> enabledServerInstanceMap =
_routingManager.getEnabledServerInstanceMap();
+ candidateServers = new ArrayList<>(enabledServerInstanceMap.size());
+ for (ServerInstance serverInstance :
enabledServerInstanceMap.values()) {
+ candidateServers.add(new QueryServerInstance(serverInstance));
+ }
+ if (candidateServers.isEmpty()) {
+ LOGGER.error("[RequestId: {}] No server instance found for
intermediate stage", context.getRequestId());
+ throw new IllegalStateException("No server instance found for
intermediate stage");
+ }
+ } else {
+ candidateServers = new ArrayList<>(leafServerInstances);
+ }
} else {
candidateServers = getCandidateServersPerTables(context);
}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
new file mode 100644
index 00000000000..45c072a1ada
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/WorkerManagerTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
+import org.apache.pinot.core.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for {@link WorkerManager}.
+ */
+public class WorkerManagerTest {
+
+ private static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
+ return new Schema.SchemaBuilder()
+ .addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
+ .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+ .addMetric("col3", FieldSpec.DataType.INT, 0)
+ .setSchemaName(schemaName);
+ }
+
+ private static ServerInstance getServerInstance(String hostname, int port) {
+ String server = String.format("%s%s_%d",
CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, port);
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server);
+ ZNRecord znRecord = instanceConfig.getRecord();
+ Map<String, String> simpleFields = znRecord.getSimpleFields();
+ simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY,
String.valueOf(port));
+
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY,
String.valueOf(port));
+
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
String.valueOf(port));
+ return new ServerInstance(instanceConfig);
+ }
+
+ /**
+ * Tests that when useLeafServerForIntermediateStage is enabled and querying
an empty table
+ * (which results in no leaf servers), the query planner falls back to using
all enabled servers
+ * instead of failing.
+ *
+ * This test simulates the scenario where a table exists with routing but
has no segments,
+ * resulting in an empty RoutingTable (no server instances with segments).
+ */
+ @Test
+ public void testSingletonWorkerWithEmptyTableAndUseLeafServerEnabled() {
+ Schema emptyTableSchema = getSchemaBuilder("emptyTable").build();
+
+ // Create server instances
+ ServerInstance server1 = getServerInstance("localhost", 1);
+ ServerInstance server2 = getServerInstance("localhost", 2);
+ Map<String, ServerInstance> serverInstanceMap = new HashMap<>();
+ serverInstanceMap.put(server1.getInstanceId(), server1);
+ serverInstanceMap.put(server2.getInstanceId(), server2);
+
+ // Create a routing table with no segments (empty table scenario)
+ RoutingTable emptyRoutingTable = new RoutingTable(Collections.emptyMap(),
List.of(), 0);
+
+ // Create mock routing manager
+ RoutingManager routingManager = new
EmptyTableRoutingManager(serverInstanceMap, emptyRoutingTable);
+
+ // Create mock table cache
+ Map<String, String> tableNameMap = new HashMap<>();
+ tableNameMap.put("emptyTable_OFFLINE", "emptyTable_OFFLINE");
+ tableNameMap.put("emptyTable", "emptyTable");
+
+ TableCache tableCache = mock(TableCache.class);
+ when(tableCache.getTableNameMap()).thenReturn(tableNameMap);
+ when(tableCache.getActualTableName(anyString())).thenAnswer(inv ->
tableNameMap.get(inv.getArgument(0)));
+ when(tableCache.getSchema(anyString())).thenReturn(emptyTableSchema);
+ when(tableCache.getTableConfig("emptyTable_OFFLINE"))
+ .thenReturn(mock(org.apache.pinot.spi.config.table.TableConfig.class));
+
+ WorkerManager workerManager = new WorkerManager("Broker_localhost",
"localhost", 3, routingManager);
+ QueryEnvironment queryEnvironment = new
QueryEnvironment(CommonConstants.DEFAULT_DATABASE, tableCache,
+ workerManager);
+
+ // This query requires a singleton worker (due to LIMIT) and uses
useLeafServerForIntermediateStage
+ // When querying an empty table, there are no leaf servers, so we need to
fall back to enabled servers
+ String query = "SET useLeafServerForIntermediateStage=true; SELECT * FROM
emptyTable LIMIT 10";
+
+ // This should not throw "bound must be positive" error anymore
+ @SuppressWarnings("deprecation")
+ DispatchableSubPlan dispatchableSubPlan =
queryEnvironment.planQuery(query);
+ assertNotNull(dispatchableSubPlan);
+ }
+
+ /**
+ * A custom RoutingManager implementation that simulates a table with
routing but no segments.
+ * This is used to test the empty leaf server fallback logic.
+ */
+ private static class EmptyTableRoutingManager implements RoutingManager {
+ private final Map<String, ServerInstance> _serverInstanceMap;
+ private final RoutingTable _emptyRoutingTable;
+
+ public EmptyTableRoutingManager(Map<String, ServerInstance>
serverInstanceMap, RoutingTable emptyRoutingTable) {
+ _serverInstanceMap = serverInstanceMap;
+ _emptyRoutingTable = emptyRoutingTable;
+ }
+
+ @Override
+ public Map<String, ServerInstance> getEnabledServerInstanceMap() {
+ return _serverInstanceMap;
+ }
+
+ @Nullable
+ @Override
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long
requestId) {
+ return _emptyRoutingTable;
+ }
+
+ @Nullable
+ @Override
+ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String
tableNameWithType, long requestId) {
+ return _emptyRoutingTable;
+ }
+
+ @Nullable
+ @Override
+ public List<String> getSegments(BrokerRequest brokerRequest) {
+ return List.of();
+ }
+
+ @Override
+ public boolean routingExists(String tableNameWithType) {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public TablePartitionReplicatedServersInfo
getTablePartitionReplicatedServersInfo(String tableNameWithType) {
+ return null;
+ }
+
+ @Override
+ public Set<String> getServingInstances(String tableNameWithType) {
+ return new HashSet<>(_serverInstanceMap.keySet());
+ }
+
+ @Override
+ public boolean isTableDisabled(String tableNameWithType) {
+ return false;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]