This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 99cde4d164 [MSE] For constant expression query, solve it with a single
random server (#16083)
99cde4d164 is described below
commit 99cde4d16433a3ded02a5a6a6c3b230d048bc8c0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Jun 12 11:08:48 2025 -0600
[MSE] For constant expression query, solve it with a single random server
(#16083)
---
.../MultiNodesOfflineClusterIntegrationTest.java | 12 +++
.../apache/pinot/query/routing/WorkerManager.java | 97 +++++++++++++---------
2 files changed, 69 insertions(+), 40 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index e73f47fe0b..91761b31c8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -288,6 +288,18 @@ public class MultiNodesOfflineClusterIntegrationTest
extends OfflineClusterInteg
assertEquals(row.get(1).doubleValue(), 725560.0 / 444);
}
+ @Test
+ public void testConstantExpressionQuery()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ JsonNode result = postQuery("SELECT 1");
+ assertEquals(result.get("numServersQueried").intValue(), 1);
+
+ result = postQuery("SELECT DaysSinceEpoch, AVG(CRSArrTime) FROM mytable
WHERE false GROUP BY 1 ORDER BY 2 DESC");
+ assertEquals(result.get("numServersQueried").intValue(), 1);
+ }
+
// Disabled because segments might not be server partitioned with multiple
servers
@Test(enabled = false)
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 4a28f11632..c35799d69a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -22,9 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -191,6 +193,25 @@ public class WorkerManager {
Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata =
metadataMap.get(fragment.getFragmentId());
+ if (context.getTableNames().isEmpty()) {
+ // For constant expression query (no table is accessed), assign it to a
random enabled server.
+ // TODO: Consider short-circuiting it and directly calculating the
result on broker.
+
+ Collection<ServerInstance> serverInstances =
_routingManager.getEnabledServerInstanceMap().values();
+ int numServers = serverInstances.size();
+ if (numServers == 0) {
+ LOGGER.error("[RequestId: {}] No server instance found for constant
expression query", context.getRequestId());
+ throw new IllegalStateException("No server instance found for constant
expression query");
+ }
+ int index = RANDOM.nextInt(numServers);
+ Iterator<ServerInstance> iterator = serverInstances.iterator();
+ for (int i = 0; i < index; i++) {
+ iterator.next();
+ }
+ metadata.setWorkerIdToServerInstanceMap(Map.of(0, new
QueryServerInstance(iterator.next())));
+ return;
+ }
+
if (isPrePartitionAssignment(children, metadataMap)) {
// If all the children are pre-partitioned the same way, use local
exchange.
DispatchablePlanMetadata firstChildMetadata =
metadataMap.get(children.get(0).getFragmentId());
@@ -200,8 +221,8 @@ public class WorkerManager {
}
if (metadata.isRequiresSingletonInstance()) {
- // When singleton instance is required, return a single worker with ID 0.
- List<ServerInstance> serverInstances = assignServerInstances(context);
+ // When singleton instance is required, assign it to a random candidate
server.
+ List<ServerInstance> serverInstances = getCandidateServers(context);
metadata.setWorkerIdToServerInstanceMap(
Map.of(0, new
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
return;
@@ -221,7 +242,7 @@ public class WorkerManager {
// If there is no local exchange, assign workers to the servers hosting
the tables
List<ServerInstance> serverInstances = null;
if (workerIdToServerInstanceMap == null) {
- serverInstances = assignServerInstances(context);
+ serverInstances = getCandidateServers(context);
int stageParallelism = Integer.parseInt(
context.getPlannerContext().getOptions().getOrDefault(QueryOptionKey.STAGE_PARALLELISM,
"1"));
workerIdToServerInstanceMap =
Maps.newHashMapWithExpectedSize(serverInstances.size() * stageParallelism);
@@ -319,48 +340,44 @@ public class WorkerManager {
/**
* Returns the servers serving any segment of the tables in the query.
*/
- private List<ServerInstance> assignServerInstances(DispatchablePlanContext
context) {
+ private List<ServerInstance> getCandidateServers(DispatchablePlanContext
context) {
List<ServerInstance> serverInstances;
Set<String> tableNames = context.getTableNames();
+ assert tableNames != null;
Map<String, ServerInstance> enabledServerInstanceMap =
_routingManager.getEnabledServerInstanceMap();
- if (tableNames.isEmpty()) {
- // TODO: Short circuit it when no table needs to be scanned
- // This could be the case from queries that don't actually fetch values
from the tables. In such cases the
- // routing need not be tenant aware.
- // Eg: SELECT 1 AS one FROM select_having_expression_test_test_having
HAVING 1 > 2;
- serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
- } else {
- Set<String> servers = new HashSet<>();
- for (String tableName : tableNames) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == null) {
- Set<String> offlineTableServers =
_routingManager.getServingInstances(
-
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
- if (offlineTableServers != null) {
- servers.addAll(offlineTableServers);
- }
- Set<String> realtimeTableServers =
_routingManager.getServingInstances(
-
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
- if (realtimeTableServers != null) {
- servers.addAll(realtimeTableServers);
- }
- } else {
- Set<String> tableServers =
_routingManager.getServingInstances(tableName);
- if (tableServers != null) {
- servers.addAll(tableServers);
- }
+ Set<String> servers = new HashSet<>();
+ for (String tableName : tableNames) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ Set<String> offlineTableServers = _routingManager.getServingInstances(
+
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
+ if (offlineTableServers != null) {
+ servers.addAll(offlineTableServers);
+ }
+ Set<String> realtimeTableServers = _routingManager.getServingInstances(
+
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
+ if (realtimeTableServers != null) {
+ servers.addAll(realtimeTableServers);
}
- }
- if (servers.isEmpty()) {
- // fall back to use all enabled servers if no server is found for the
tables
- serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
} else {
- serverInstances = new ArrayList<>(servers.size());
- for (String server : servers) {
- ServerInstance serverInstance = enabledServerInstanceMap.get(server);
- if (serverInstance != null) {
- serverInstances.add(serverInstance);
- }
+ Set<String> tableServers =
_routingManager.getServingInstances(tableName);
+ if (tableServers != null) {
+ servers.addAll(tableServers);
+ }
+ }
+ }
+ if (servers.isEmpty()) {
+ // Fall back to use all enabled servers if no server is found for the
tables.
+ // TODO: Revisit if we should throw an exception instead.
+ LOGGER.warn("[RequestId: {}] No server instance found for intermediate
stage for tables: {}, "
+ + "falling back to all enabled servers", context.getRequestId(),
tableNames);
+ serverInstances = new ArrayList<>(enabledServerInstanceMap.values());
+ } else {
+ serverInstances = new ArrayList<>(servers.size());
+ for (String server : servers) {
+ ServerInstance serverInstance = enabledServerInstanceMap.get(server);
+ if (serverInstance != null) {
+ serverInstances.add(serverInstance);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]