This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4e50229a64 global index virtualID (#10665)
4e50229a64 is described below
commit 4e50229a643e18ae04157465157ff04f219b9592
Author: Rong Rong <[email protected]>
AuthorDate: Fri Apr 21 15:38:43 2023 -0700
global index virtualID (#10665)
---
.../apache/pinot/query/routing/WorkerManager.java | 34 ++++++++++------------
.../apache/pinot/query/QueryCompilationTest.java | 4 +--
.../apache/pinot/query/runtime/QueryRunner.java | 9 +++---
.../runtime/executor/RoundRobinSchedulerTest.java | 2 +-
4 files changed, 24 insertions(+), 25 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 1e12c961dd..dbebcd7692 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -99,11 +99,12 @@ public class WorkerManager {
"Entry for server {} and table type: {} already exist!",
serverEntry.getKey(), tableType);
}
}
- stageMetadata.setServerInstances(new ArrayList<>(
- serverInstanceToSegmentsMap.keySet()
- .stream()
- .map(server -> new VirtualServer(server, 0)) // the leaf stage
only has one server, so always use 0 here
- .collect(Collectors.toList())));
+ int globalIdx = 0;
+ List<VirtualServer> serverInstanceList = new ArrayList<>();
+ for (ServerInstance serverInstance :
serverInstanceToSegmentsMap.keySet()) {
+ serverInstanceList.add(new VirtualServer(serverInstance, globalIdx++));
+ }
+ stageMetadata.setServerInstances(serverInstanceList);
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
} else if (PlannerUtils.isRootStage(stageId)) {
// --- ROOT STAGE / BROKER REDUCE STAGE ---
@@ -136,30 +137,27 @@ public class WorkerManager {
boolean requiresSingletonInstance, Map<String, String> options) {
int stageParallelism = Integer.parseInt(
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
"1"));
-
- List<VirtualServer> serverInstances = new ArrayList<>();
- int idx = 0;
- int matchingIdx = -1;
+ List<ServerInstance> serverInstances = new ArrayList<>(servers);
+ List<VirtualServer> virtualServerList = new ArrayList<>();
if (requiresSingletonInstance) {
- matchingIdx = RANDOM.nextInt(servers.size());
- }
- for (ServerInstance server : servers) {
- if (matchingIdx == -1 || idx == matchingIdx) {
+ // require singleton should return a single global worker ID with 0;
+ ServerInstance serverInstance =
serverInstances.get(RANDOM.nextInt(serverInstances.size()));
+ virtualServerList.add(new VirtualServer(serverInstance, 0));
+ } else {
+ int globalIdx = 0;
+ for (ServerInstance server : servers) {
String hostname = server.getHostname();
if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
for (int virtualId = 0; virtualId < stageParallelism; virtualId++) {
- if (matchingIdx == -1 || virtualId == 0) {
- serverInstances.add(new VirtualServer(server, virtualId));
+ virtualServerList.add(new VirtualServer(server, globalIdx++));
}
}
}
- }
- idx++;
}
- return serverInstances;
+ return virtualServerList;
}
/**
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index c198d03a03..4177e0b0b2 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -120,13 +120,13 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
// table scan stages; for tableA it should have 2 hosts, for tableB it
should have only 1
Assert.assertEquals(
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toList()),
- tables.get(0).equals("a") ?
ImmutableList.of("0@Server_localhost_2", "0@Server_localhost_1")
+ tables.get(0).equals("a") ?
ImmutableList.of("0@Server_localhost_2", "1@Server_localhost_1")
: ImmutableList.of("0@Server_localhost_1"));
} else if (!PlannerUtils.isRootStage(e.getKey())) {
// join stage should have both servers used.
Assert.assertEquals(
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
- ImmutableSet.of("0@Server_localhost_1", "0@Server_localhost_2"));
+ ImmutableSet.of("1@Server_localhost_1", "0@Server_localhost_2"));
} else {
// reduce stage should have the reducer instance.
Assert.assertEquals(
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a1cfafd8ee..5a72fa054f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -168,7 +168,7 @@ public class QueryRunner {
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE,
"false"));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
if (isLeafStage(distributedStagePlan)) {
- runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs,
requestId);
+ runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs,
deadlineMs, requestId);
} else {
StageNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
@@ -192,7 +192,7 @@ public class QueryRunner {
}
private void runLeafStage(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap,
- long deadlineMs, long requestId) {
+ long timeoutMs, long deadlineMs, long requestId) {
// TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
// and package it here for return. But we should really use a
MailboxSendOperator directly put into the
// server executor.
@@ -217,8 +217,9 @@ public class QueryRunner {
+ (System.currentTimeMillis() - leafStageStartMillis) + " ms");
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(_mailboxService, requestId,
sendNode.getStageId(), _rootServer, deadlineMs,
- deadlineMs, distributedStagePlan.getMetadataMap(),
isTraceEnabled);
+ new OpChainExecutionContext(_mailboxService, requestId,
sendNode.getStageId(),
+ new VirtualServerAddress(distributedStagePlan.getServer()),
timeoutMs, deadlineMs,
+ distributedStagePlan.getMetadataMap(), isTraceEnabled);
MultiStageOperator leafStageOperator =
new LeafStageTransferableBlockOperator(opChainExecutionContext,
serverQueryResults, sendNode.getDataSchema());
mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext,
leafStageOperator,
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 1f8eb9e254..37f2b2517c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -39,7 +39,7 @@ public class RoundRobinSchedulerTest {
private static final int DEFAULT_SENDER_STAGE_ID = 0;
private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
private static final int DEFAULT_VIRTUAL_SERVER_ID = 1;
- private static final int DEFAULT_POLL_TIMEOUT_MS = 0;
+ private static final int DEFAULT_POLL_TIMEOUT_MS = 1;
private static final int DEFAULT_RELEASE_TIMEOUT_MS = 10;
private static final long DEFAULT_REQUEST_ID = 123;
private static final String DEFAULT_SENDER_SERIALIZED = "0@foo:2";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]