This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e50229a64 global index virtualID (#10665)
4e50229a64 is described below

commit 4e50229a643e18ae04157465157ff04f219b9592
Author: Rong Rong <[email protected]>
AuthorDate: Fri Apr 21 15:38:43 2023 -0700

    global index virtualID (#10665)
---
 .../apache/pinot/query/routing/WorkerManager.java  | 34 ++++++++++------------
 .../apache/pinot/query/QueryCompilationTest.java   |  4 +--
 .../apache/pinot/query/runtime/QueryRunner.java    |  9 +++---
 .../runtime/executor/RoundRobinSchedulerTest.java  |  2 +-
 4 files changed, 24 insertions(+), 25 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 1e12c961dd..dbebcd7692 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -99,11 +99,12 @@ public class WorkerManager {
               "Entry for server {} and table type: {} already exist!", 
serverEntry.getKey(), tableType);
         }
       }
-      stageMetadata.setServerInstances(new ArrayList<>(
-          serverInstanceToSegmentsMap.keySet()
-              .stream()
-              .map(server -> new VirtualServer(server, 0)) // the leaf stage 
only has one server, so always use 0 here
-              .collect(Collectors.toList())));
+      int globalIdx = 0;
+      List<VirtualServer> serverInstanceList = new ArrayList<>();
+      for (ServerInstance serverInstance : 
serverInstanceToSegmentsMap.keySet()) {
+        serverInstanceList.add(new VirtualServer(serverInstance, globalIdx++));
+      }
+      stageMetadata.setServerInstances(serverInstanceList);
       
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
     } else if (PlannerUtils.isRootStage(stageId)) {
       // --- ROOT STAGE / BROKER REDUCE STAGE ---
@@ -136,30 +137,27 @@ public class WorkerManager {
       boolean requiresSingletonInstance, Map<String, String> options) {
     int stageParallelism = Integer.parseInt(
         
options.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.STAGE_PARALLELISM,
 "1"));
-
-    List<VirtualServer> serverInstances = new ArrayList<>();
-    int idx = 0;
-    int matchingIdx = -1;
+    List<ServerInstance> serverInstances = new ArrayList<>(servers);
+    List<VirtualServer> virtualServerList = new ArrayList<>();
     if (requiresSingletonInstance) {
-      matchingIdx = RANDOM.nextInt(servers.size());
-    }
-    for (ServerInstance server : servers) {
-      if (matchingIdx == -1 || idx == matchingIdx) {
+      // require singleton should return a single global worker ID with 0;
+      ServerInstance serverInstance = 
serverInstances.get(RANDOM.nextInt(serverInstances.size()));
+      virtualServerList.add(new VirtualServer(serverInstance, 0));
+    } else {
+      int globalIdx = 0;
+      for (ServerInstance server : servers) {
         String hostname = server.getHostname();
         if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0
             && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
             && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
             && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
           for (int virtualId = 0; virtualId < stageParallelism; virtualId++) {
-            if (matchingIdx == -1 || virtualId == 0) {
-              serverInstances.add(new VirtualServer(server, virtualId));
+              virtualServerList.add(new VirtualServer(server, globalIdx++));
             }
           }
         }
-      }
-      idx++;
     }
-    return serverInstances;
+    return virtualServerList;
   }
 
   /**
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index c198d03a03..4177e0b0b2 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -120,13 +120,13 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
         // table scan stages; for tableA it should have 2 hosts, for tableB it 
should have only 1
         Assert.assertEquals(
             
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toList()),
-            tables.get(0).equals("a") ? 
ImmutableList.of("0@Server_localhost_2", "0@Server_localhost_1")
+            tables.get(0).equals("a") ? 
ImmutableList.of("0@Server_localhost_2", "1@Server_localhost_1")
                 : ImmutableList.of("0@Server_localhost_1"));
       } else if (!PlannerUtils.isRootStage(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(
             
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
-            ImmutableSet.of("0@Server_localhost_1", "0@Server_localhost_2"));
+            ImmutableSet.of("1@Server_localhost_1", "0@Server_localhost_2"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a1cfafd8ee..5a72fa054f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -168,7 +168,7 @@ public class QueryRunner {
         
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE,
 "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
     if (isLeafStage(distributedStagePlan)) {
-      runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs, 
requestId);
+      runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, 
deadlineMs, requestId);
     } else {
       StageNode stageRoot = distributedStagePlan.getStageRoot();
       OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
@@ -192,7 +192,7 @@ public class QueryRunner {
   }
 
   private void runLeafStage(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataMap,
-      long deadlineMs, long requestId) {
+      long timeoutMs, long deadlineMs, long requestId) {
     // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
     // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
     // server executor.
@@ -217,8 +217,9 @@ public class QueryRunner {
               + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       OpChainExecutionContext opChainExecutionContext =
-          new OpChainExecutionContext(_mailboxService, requestId, 
sendNode.getStageId(), _rootServer, deadlineMs,
-              deadlineMs, distributedStagePlan.getMetadataMap(), 
isTraceEnabled);
+          new OpChainExecutionContext(_mailboxService, requestId, 
sendNode.getStageId(),
+              new VirtualServerAddress(distributedStagePlan.getServer()), 
timeoutMs, deadlineMs,
+              distributedStagePlan.getMetadataMap(), isTraceEnabled);
       MultiStageOperator leafStageOperator =
           new LeafStageTransferableBlockOperator(opChainExecutionContext, 
serverQueryResults, sendNode.getDataSchema());
       mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, 
leafStageOperator,
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 1f8eb9e254..37f2b2517c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -39,7 +39,7 @@ public class RoundRobinSchedulerTest {
   private static final int DEFAULT_SENDER_STAGE_ID = 0;
   private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
   private static final int DEFAULT_VIRTUAL_SERVER_ID = 1;
-  private static final int DEFAULT_POLL_TIMEOUT_MS = 0;
+  private static final int DEFAULT_POLL_TIMEOUT_MS = 1;
   private static final int DEFAULT_RELEASE_TIMEOUT_MS = 10;
   private static final long DEFAULT_REQUEST_ID = 123;
   private static final String DEFAULT_SENDER_SERIALIZED = "0@foo:2";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to