This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9105a06ce8 [multistage] use virtual servers instead of ServerInstance 
(#10157)
9105a06ce8 is described below

commit 9105a06ce8d7c0e1b0988c6883f620801d07bbea
Author: Almog Gavra <[email protected]>
AuthorDate: Tue Jan 24 17:36:50 2023 -0800

    [multistage] use virtual servers instead of ServerInstance (#10157)
---
 .../query/planner/ExplainPlanStageVisitor.java     |  21 ++--
 .../apache/pinot/query/planner/StageMetadata.java  |  11 +-
 .../colocated/GreedyShuffleRewriteVisitor.java     |   8 +-
 .../apache/pinot/query/routing/VirtualServer.java  |  92 +++++++++++++++++
 .../pinot/query/routing/VirtualServerAddress.java  |  43 ++++++--
 .../apache/pinot/query/routing/WorkerManager.java  |  16 ++-
 .../apache/pinot/query/QueryCompilationTest.java   |  18 ++--
 .../pinot/query/mailbox/JsonMailboxIdentifier.java |  19 ++--
 .../pinot/query/mailbox/MailboxIdentifier.java     |   7 +-
 .../java/org/apache/pinot/query/mailbox/Utils.java |   5 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  18 ++--
 .../runtime/operator/MailboxReceiveOperator.java   |  26 ++---
 .../runtime/operator/MailboxSendOperator.java      |  26 ++---
 .../query/runtime/plan/DistributedStagePlan.java   |  16 +--
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   8 +-
 .../query/runtime/plan/PlanRequestContext.java     |  17 ++--
 .../runtime/plan/ServerRequestPlanVisitor.java     |   5 +-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    |  20 ++--
 .../plan/server/ServerPlanRequestContext.java      |   5 +-
 .../pinot/query/service/QueryDispatcher.java       |  19 ++--
 .../query/mailbox/GrpcMailboxServiceTest.java      |   9 +-
 .../query/mailbox/InMemoryMailboxServiceTest.java  |   5 +-
 .../mailbox/MultiplexingMailboxServiceTest.java    |   5 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |  10 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |   8 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |   4 +-
 .../operator/MailboxReceiveOperatorTest.java       | 113 ++++++++++++---------
 .../runtime/operator/MailboxSendOperatorTest.java  |  14 +--
 .../operator/exchange/BlockExchangeTest.java       |   5 +-
 .../operator/exchange/BroadcastExchangeTest.java   |   4 +-
 .../operator/exchange/HashExchangeTest.java        |   5 +-
 .../operator/exchange/RandomExchangeTest.java      |   4 +-
 .../operator/exchange/SingletonExchangeTest.java   |   2 +-
 .../pinot/query/service/QueryServerTest.java       |   3 +-
 34 files changed, 374 insertions(+), 217 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
index e0345590dc..ccc71858cd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -34,6 +34,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
 
 
 /**
@@ -59,7 +60,7 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
     }
 
     // the root of a query plan always only has a single node
-    ServerInstance rootServer = 
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
+    VirtualServer rootServer = 
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
     return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0), 
rootServer);
   }
 
@@ -75,7 +76,7 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
    *
    * @return a query plan associated with
    */
-  public static String explainFrom(QueryPlan queryPlan, StageNode node, 
ServerInstance rootServer) {
+  public static String explainFrom(QueryPlan queryPlan, StageNode node, 
VirtualServer rootServer) {
     final ExplainPlanStageVisitor visitor = new 
ExplainPlanStageVisitor(queryPlan);
     return node
         .visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
@@ -133,9 +134,9 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
     StageMetadata metadata = 
_queryPlan.getStageMetadataMap().get(senderStageId);
     Map<ServerInstance, Map<String, List<String>>> segments = 
metadata.getServerInstanceToSegmentsMap();
 
-    Iterator<ServerInstance> iterator = 
metadata.getServerInstances().iterator();
+    Iterator<VirtualServer> iterator = 
metadata.getServerInstances().iterator();
     while (iterator.hasNext()) {
-      ServerInstance serverInstance = iterator.next();
+      VirtualServer serverInstance = iterator.next();
       if (segments.containsKey(serverInstance)) {
         // always print out leaf stages
         sender.visit(this, context.next(iterator.hasNext(), serverInstance));
@@ -164,10 +165,10 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
     appendInfo(node, context);
 
     int receiverStageId = node.getReceiverStageId();
-    List<ServerInstance> servers = 
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
+    List<VirtualServer> servers = 
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
     context._builder.append("->");
     String receivers = servers.stream()
-        .map(s -> s.getHostname() + ':' + s.getPort())
+        .map(VirtualServer::toString)
         .map(s -> "[" + receiverStageId + "]@" + s)
         .collect(Collectors.joining(",", "{", "}"));
     return context._builder.append(receivers);
@@ -190,7 +191,7 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
         .append(_queryPlan.getStageMetadataMap()
             .get(node.getStageId())
             .getServerInstanceToSegmentsMap()
-            .get(context._host))
+            .get(context._host.getServer()))
         .append('\n');
   }
 
@@ -200,19 +201,19 @@ public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder,
   }
 
   static class Context {
-    final ServerInstance _host;
+    final VirtualServer _host;
     final String _prefix;
     final String _childPrefix;
     final StringBuilder _builder;
 
-    Context(ServerInstance host, String prefix, String childPrefix, 
StringBuilder builder) {
+    Context(VirtualServer host, String prefix, String childPrefix, 
StringBuilder builder) {
       _host = host;
       _prefix = prefix;
       _childPrefix = childPrefix;
       _builder = builder;
     }
 
-    Context next(boolean hasMoreChildren, ServerInstance host) {
+    Context next(boolean hasMoreChildren, VirtualServer host) {
       return new Context(
           host,
           hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index 2f21a64c27..225599e098 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.routing.VirtualServer;
 
 
 /**
@@ -43,9 +44,11 @@ public class StageMetadata implements Serializable {
   private List<String> _scannedTables;
 
   // used for assigning server/worker nodes.
-  private List<ServerInstance> _serverInstances;
+  private List<VirtualServer> _serverInstances;
 
-  // used for table scan stage.
+  // used for table scan stage - we use ServerInstance instead of VirtualServer
+  // here because all virtual servers that share a server instance will have 
the
+  // same segments on them
   private Map<ServerInstance, Map<String, List<String>>> 
_serverInstanceToSegmentsMap;
 
   // time boundary info
@@ -82,11 +85,11 @@ public class StageMetadata implements Serializable {
     _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
   }
 
-  public List<ServerInstance> getServerInstances() {
+  public List<VirtualServer> getServerInstances() {
     return _serverInstances;
   }
 
-  public void setServerInstances(List<ServerInstance> serverInstances) {
+  public void setServerInstances(List<VirtualServer> serverInstances) {
     _serverInstances = serverInstances;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 95eff8ac91..b37e250f14 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -45,6 +44,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -298,9 +298,9 @@ public class GreedyShuffleRewriteVisitor
    * 2. Servers assigned to the join-stage are a superset of S.
    */
   private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int 
leftStageId, int rightStageId) {
-    Set<ServerInstance> leftServerInstances = new 
HashSet<>(_stageMetadataMap.get(leftStageId).getServerInstances());
-    List<ServerInstance> rightServerInstances = 
_stageMetadataMap.get(rightStageId).getServerInstances();
-    List<ServerInstance> currentServerInstances = 
_stageMetadataMap.get(currentStageId).getServerInstances();
+    Set<VirtualServer> leftServerInstances = new 
HashSet<>(_stageMetadataMap.get(leftStageId).getServerInstances());
+    List<VirtualServer> rightServerInstances = 
_stageMetadataMap.get(rightStageId).getServerInstances();
+    List<VirtualServer> currentServerInstances = 
_stageMetadataMap.get(currentStageId).getServerInstances();
     return leftServerInstances.containsAll(rightServerInstances)
         && leftServerInstances.size() == rightServerInstances.size()
         && currentServerInstances.containsAll(leftServerInstances);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
new file mode 100644
index 0000000000..c50aee41e7
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.routing;
+
+import java.util.Objects;
+import org.apache.pinot.core.transport.ServerInstance;
+
+
+/**
+ * {@code VirtualServer} is a {@link ServerInstance} associated with a
+ * unique virtualization identifier which allows the multistage query
+ * engine to collocate multiple virtual servers on a single physical
+ * instance, enabling higher levels of parallelism and partitioning
+ * the query input.
+ */
+public class VirtualServer {
+
+  private final ServerInstance _server;
+  private final int _virtualId;
+
+  public VirtualServer(ServerInstance server, int virtualId) {
+    _server = server;
+    _virtualId = virtualId;
+  }
+
+  public ServerInstance getServer() {
+    return _server;
+  }
+
+  public int getVirtualId() {
+    return _virtualId;
+  }
+
+  public String getHostname() {
+    return _server.getHostname();
+  }
+
+  public int getPort() {
+    return _server.getPort();
+  }
+
+  public int getQueryMailboxPort() {
+    return _server.getQueryMailboxPort();
+  }
+
+  public int getQueryServicePort() {
+    return _server.getQueryServicePort();
+  }
+
+  public int getGrpcPort() {
+    return _server.getGrpcPort();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    VirtualServer that = (VirtualServer) o;
+    return _virtualId == that._virtualId && Objects.equals(_server, 
that._server);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_server, _virtualId);
+  }
+
+  @Override
+  public String toString() {
+    return _virtualId + "@" + _server.getInstanceId();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
similarity index 53%
rename from 
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
rename to 
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
index cda2b332fe..99074f7191 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
@@ -17,19 +17,35 @@
  * under the License.
  */
 
-package org.apache.pinot.query.mailbox;
+package org.apache.pinot.query.routing;
 
 import java.util.Objects;
 
 
-public class ServerAddress {
+/**
+ * Represents the address of a {@link VirtualServer} containing
+ * both the ID of the specific virtualized server and the physical
+ * internet address in id@hostname:port format.
+ *
+ * <p>This is needed in addition to {@code VirtualServer} because there
+ * are some parts of the code that don't have enough information to
+ * construct the full {@code VirtualServer} and only require the
+ * hostname, port and virtualId.</p>
+ */
+public class VirtualServerAddress {
 
   private final String _hostname;
   private final int _port;
+  private final int _virtualId;
 
-  public ServerAddress(String hostname, int port) {
+  public VirtualServerAddress(String hostname, int port, int virtualId) {
     _hostname = hostname;
     _port = port;
+    _virtualId = virtualId;
+  }
+
+  public VirtualServerAddress(VirtualServer server) {
+    this(server.getHostname(), server.getQueryMailboxPort(), 
server.getVirtualId());
   }
 
   /**
@@ -39,9 +55,10 @@ public class ServerAddress {
    * @param address the serialized string
    * @return the deserialized form
    */
-  public static ServerAddress parse(String address) {
-    String[] split = address.split(":");
-    return new ServerAddress(split[0], Integer.parseInt(split[1]));
+  public static VirtualServerAddress parse(String address) {
+    String[] split = address.split("@");
+    String[] hostSplit = split[1].split(":");
+    return new VirtualServerAddress(hostSplit[0], 
Integer.parseInt(hostSplit[1]), Integer.parseInt(split[0]));
   }
 
   /**
@@ -58,6 +75,10 @@ public class ServerAddress {
     return _port;
   }
 
+  public int virtualId() {
+    return _virtualId;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -66,17 +87,19 @@ public class ServerAddress {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    ServerAddress that = (ServerAddress) o;
-    return _port == that._port && Objects.equals(_hostname, that._hostname);
+    VirtualServerAddress that = (VirtualServerAddress) o;
+    return _port == that._port
+        && _virtualId == that._virtualId
+        && Objects.equals(_hostname, that._hostname);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(_hostname, _port);
+    return Objects.hash(_hostname, _port, _virtualId);
   }
 
   @Override
   public String toString() {
-    return _hostname + ":" + _port;
+    return _virtualId + "@" + _hostname + ":" + _port;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 36c0f2bd81..0e3750dc5d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -93,26 +94,31 @@ public class WorkerManager {
               "Entry for server {} and table type: {} already exist!", 
serverEntry.getKey(), tableType);
         }
       }
-      stageMetadata.setServerInstances(new 
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
+      stageMetadata.setServerInstances(new ArrayList<>(
+          serverInstanceToSegmentsMap.keySet()
+              .stream()
+              .map(server -> new VirtualServer(server, 0)) // for now, only 
use single virtual server
+              .collect(Collectors.toList())));
       
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
     } else if (PlannerUtils.isRootStage(stageId)) {
       // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results.
       // here we simply assign the worker instance with identical 
server/mailbox port number.
-      stageMetadata.setServerInstances(Lists.newArrayList(new 
WorkerInstance(_hostName, _port, _port, _port, _port)));
+      stageMetadata.setServerInstances(Lists.newArrayList(
+          new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port, 
_port), 0)));
     } else {
       
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
     }
   }
 
-  private static List<ServerInstance> filterServers(Collection<ServerInstance> 
servers) {
-    List<ServerInstance> serverInstances = new ArrayList<>();
+  private static List<VirtualServer> filterServers(Collection<ServerInstance> 
servers) {
+    List<VirtualServer> serverInstances = new ArrayList<>();
     for (ServerInstance server : servers) {
       String hostname = server.getHostname();
       if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
           && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
-        serverInstances.add(server);
+        serverInstances.add(new VirtualServer(server, 0));
       }
     }
     return serverInstances;
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 845533485a..fed57b3406 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -39,6 +38,7 @@ import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -119,19 +119,19 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
       if (tables.size() == 1) {
         // table scan stages; for tableA it should have 2 hosts, for tableB it 
should have only 1
         Assert.assertEquals(
-            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
-            tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_2", 
"Server_localhost_1")
-                : ImmutableList.of("Server_localhost_1"));
+            
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toList()),
+            tables.get(0).equals("a") ? 
ImmutableList.of("0@Server_localhost_2", "0@Server_localhost_1")
+                : ImmutableList.of("0@Server_localhost_1"));
       } else if (!PlannerUtils.isRootStage(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(
-            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
-            ImmutableSet.of("Server_localhost_1", "Server_localhost_2"));
+            
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
+            ImmutableSet.of("0@Server_localhost_1", "0@Server_localhost_2"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(
-            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
-            ImmutableSet.of("Server_localhost_3"));
+            
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
+            ImmutableSet.of("0@Server_localhost_3"));
       }
     }
   }
@@ -165,7 +165,7 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
         .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 
0).collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
     
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
1);
-    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "Server_localhost_2");
+    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "0@Server_localhost_2");
 
     query = "SELECT * FROM d";
     queryPlan = _queryEnvironment.planQuery(query);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
index 7068b06ab6..4f0b6284ad 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
@@ -20,12 +20,15 @@ package org.apache.pinot.query.mailbox;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Objects;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class JsonMailboxIdentifier implements MailboxIdentifier {
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
@@ -34,8 +37,8 @@ public class JsonMailboxIdentifier implements 
MailboxIdentifier {
   private final String _from;
   private final String _to;
 
-  private final ServerAddress _fromAddress;
-  private final ServerAddress _toAddress;
+  private final VirtualServerAddress _fromAddress;
+  private final VirtualServerAddress _toAddress;
 
   @JsonCreator
   public JsonMailboxIdentifier(
@@ -46,14 +49,14 @@ public class JsonMailboxIdentifier implements 
MailboxIdentifier {
     _jobId = jobId;
     _from = from;
     _to = to;
-    _fromAddress = ServerAddress.parse(_from);
-    _toAddress = ServerAddress.parse(_to);
+    _fromAddress = VirtualServerAddress.parse(_from);
+    _toAddress = VirtualServerAddress.parse(_to);
   }
 
   public JsonMailboxIdentifier(
       String jobId,
-      ServerAddress from,
-      ServerAddress to
+      VirtualServerAddress from,
+      VirtualServerAddress to
   ) {
     _jobId = jobId;
     _from = from.toString();
@@ -83,7 +86,7 @@ public class JsonMailboxIdentifier implements 
MailboxIdentifier {
 
   @JsonIgnore
   @Override
-  public ServerAddress getFromHost() {
+  public VirtualServerAddress getFromHost() {
     return _fromAddress;
   }
 
@@ -93,7 +96,7 @@ public class JsonMailboxIdentifier implements 
MailboxIdentifier {
 
   @JsonIgnore
   @Override
-  public ServerAddress getToHost() {
+  public VirtualServerAddress getToHost() {
     return _toAddress;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index 77647d3db2..d1d1b44f35 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import org.apache.pinot.query.routing.VirtualServerAddress;
+
+
 /**
  * {@link MailboxIdentifier} uniquely identify the mailbox that pairs a sender 
and a receiver.
  *
@@ -34,12 +37,12 @@ public interface MailboxIdentifier {
   /**
    * @return the sender address
    */
-  ServerAddress getFromHost();
+  VirtualServerAddress getFromHost();
 
   /**
    * @return the destination address
    */
-  ServerAddress getToHost();
+  VirtualServerAddress getToHost();
 
   /**
    * Checks whether sender and receiver are in the same JVM.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
index 60ae614a2f..8aedbb1a1f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import org.apache.pinot.query.routing.VirtualServerAddress;
+
+
 public final class Utils {
 
   private Utils() {
@@ -26,7 +29,7 @@ public final class Utils {
 
   public static String constructChannelId(String mailboxId) {
     MailboxIdentifier mailboxIdentifier = toMailboxIdentifier(mailboxId);
-    ServerAddress dest = mailboxIdentifier.getToHost();
+    VirtualServerAddress dest = mailboxIdentifier.getToHost();
     return dest.toString();
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index ad0d138bb4..c2826e5851 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -38,12 +38,13 @@ import 
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
@@ -82,6 +83,7 @@ public class QueryRunner {
   private MailboxService<TransferableBlock> _mailboxService;
   private String _hostname;
   private int _port;
+  private VirtualServerAddress _rootServer;
   private OpChainSchedulerService _scheduler;
 
   /**
@@ -94,6 +96,8 @@ public class QueryRunner {
     _hostname = 
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? 
instanceName.substring(
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+    // always use 0 for root server ID as all data is processed by one node at 
the global root
+    _rootServer = new VirtualServerAddress(_hostname, _port, 0);
     _helixManager = helixManager;
     try {
       long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
@@ -150,7 +154,7 @@ public class QueryRunner {
       MailboxSendOperator mailboxSendOperator = new 
MailboxSendOperator(_mailboxService,
           new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema(), requestId,
               sendNode.getStageId()), 
receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
-          sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequests.get(0).getRequestId(),
+          sendNode.getPartitionKeySelector(), _rootServer, 
serverQueryRequests.get(0).getRequestId(),
           sendNode.getStageId());
       int blockCounter = 0;
       while 
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
@@ -161,8 +165,8 @@ public class QueryRunner {
       long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
       StageNode stageRoot = distributedStagePlan.getStageRoot();
       OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
-          new PlanRequestContext(_mailboxService, requestId, 
stageRoot.getStageId(), timeoutMs, _hostname, _port,
-              distributedStagePlan.getMetadataMap()));
+          new PlanRequestContext(_mailboxService, requestId, 
stageRoot.getStageId(), timeoutMs,
+              new VirtualServerAddress(distributedStagePlan.getServer()), 
distributedStagePlan.getMetadataMap()));
       _scheduler.register(rootOperator);
     }
   }
@@ -175,7 +179,7 @@ public class QueryRunner {
         "Server request for V2 engine should only have 1 scan table per 
request.");
     String rawTableName = stageMetadata.getScannedTables().get(0);
     Map<String, List<String>> tableToSegmentListMap =
-        
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
+        
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServer().getServer());
     List<ServerPlanRequestContext> requests = new ArrayList<>();
     for (Map.Entry<String, List<String>> tableEntry : 
tableToSegmentListMap.entrySet()) {
       String tableType = tableEntry.getKey();
@@ -219,9 +223,9 @@ public class QueryRunner {
 
   private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
     int stageId = distributedStagePlan.getStageId();
-    ServerInstance serverInstance = distributedStagePlan.getServerInstance();
+    VirtualServer serverInstance = distributedStagePlan.getServer();
     StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(stageId);
-    Map<String, List<String>> segments = 
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
+    Map<String, List<String>> segments = 
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance.getServer());
     return segments != null && segments.size() > 0;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 8fb1d145c1..ad0dcdbde5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -29,12 +29,12 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.ServerAddress;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.service.QueryConfig;
@@ -68,18 +68,18 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
   private TransferableBlock _upstreamErrorBlock;
   private OperatorStats _operatorStats;
 
-  private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, 
long jobId, long stageId,
-      String receiveHostName, int receivePort) {
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long 
jobId, long stageId,
+      VirtualServerAddress receiver) {
     return new JsonMailboxIdentifier(
         String.format("%s_%s", jobId, stageId),
-        new ServerAddress(fromInstance.getHostname(), 
fromInstance.getQueryMailboxPort()),
-        new ServerAddress(receiveHostName, receivePort));
+        new VirtualServerAddress(sender),
+        receiver);
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
   public MailboxReceiveOperator(MailboxService<TransferableBlock> 
mailboxService,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type 
exchangeType, String receiveHostName,
-      int receivePort, long jobId, int stageId, Long timeoutMs) {
+      List<VirtualServer> sendingStageInstances, RelDistribution.Type 
exchangeType, VirtualServerAddress receiver,
+      long jobId, int stageId, Long timeoutMs) {
     _mailboxService = mailboxService;
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -88,8 +88,8 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
 
     _exchangeType = exchangeType;
     if (_exchangeType == RelDistribution.Type.SINGLETON) {
-      ServerInstance singletonInstance = null;
-      for (ServerInstance serverInstance : sendingStageInstances) {
+      VirtualServer singletonInstance = null;
+      for (VirtualServer serverInstance : sendingStageInstances) {
         if (serverInstance.getHostname().equals(_mailboxService.getHostname())
             && serverInstance.getQueryMailboxPort() == 
_mailboxService.getMailboxPort()) {
           Preconditions.checkState(singletonInstance == null, "multiple 
instance found for singleton exchange type!");
@@ -103,12 +103,12 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
         _sendingMailbox = Collections.emptyList();
       } else {
         _sendingMailbox =
-            Collections.singletonList(toMailboxId(singletonInstance, jobId, 
stageId, receiveHostName, receivePort));
+            Collections.singletonList(toMailboxId(singletonInstance, jobId, 
stageId, receiver));
       }
     } else {
       _sendingMailbox = new ArrayList<>(sendingStageInstances.size());
-      for (ServerInstance instance : sendingStageInstances) {
-        _sendingMailbox.add(toMailboxId(instance, jobId, stageId, 
receiveHostName, receivePort));
+      for (VirtualServer instance : sendingStageInstances) {
+        _sendingMailbox.add(toMailboxId(instance, jobId, stageId, receiver));
       }
     }
     _upstreamErrorBlock = null;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 047f5fd5b2..32ac1247c5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,12 +28,12 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.ServerAddress;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -65,20 +65,20 @@ public class MailboxSendOperator extends MultiStageOperator 
{
 
   @VisibleForTesting
   interface MailboxIdGenerator {
-    MailboxIdentifier generate(ServerInstance server);
+    MailboxIdentifier generate(VirtualServer server);
   }
 
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
-      MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> 
receivingStageInstances,
-      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
-      long jobId, int stageId) {
+      MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> 
receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
+      VirtualServerAddress sendingServer, long jobId, int stageId) {
     this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, 
exchangeType, keySelector,
-        server -> toMailboxId(server, jobId, stageId, hostName, port), 
BlockExchange::getExchange, jobId, stageId);
+        server -> toMailboxId(server, jobId, stageId, sendingServer), 
BlockExchange::getExchange, jobId, stageId);
   }
 
   @VisibleForTesting
   MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
-      MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> 
receivingStageInstances,
+      MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory 
blockExchangeFactory, long jobId, int stageId) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -86,8 +86,8 @@ public class MailboxSendOperator extends MultiStageOperator {
     List<MailboxIdentifier> receivingMailboxes;
     if (exchangeType == RelDistribution.Type.SINGLETON) {
       // TODO: this logic should be moved into SingletonExchange
-      ServerInstance singletonInstance = null;
-      for (ServerInstance serverInstance : receivingStageInstances) {
+      VirtualServer singletonInstance = null;
+      for (VirtualServer serverInstance : receivingStageInstances) {
         if (serverInstance.getHostname().equals(mailboxService.getHostname())
             && serverInstance.getQueryMailboxPort() == 
mailboxService.getMailboxPort()) {
           Preconditions.checkState(singletonInstance == null, "multiple 
instance found for singleton exchange type!");
@@ -161,10 +161,10 @@ public class MailboxSendOperator extends 
MultiStageOperator {
   }
 
   private static JsonMailboxIdentifier toMailboxId(
-      ServerInstance destination, long jobId, int stageId, String sender, int 
senderPort) {
+      VirtualServer destination, long jobId, int stageId, VirtualServerAddress 
sender) {
     return new JsonMailboxIdentifier(
         String.format("%s_%s", jobId, stageId),
-        new ServerAddress(sender, senderPort),
-        new ServerAddress(destination.getHostname(), 
destination.getQueryMailboxPort()));
+        sender,
+        new VirtualServerAddress(destination));
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index 828a969dd0..c508d746ca 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
 
 
 /**
@@ -33,7 +33,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
  */
 public class DistributedStagePlan {
   private int _stageId;
-  private ServerInstance _serverInstance;
+  private VirtualServer _server;
   private StageNode _stageRoot;
   private Map<Integer, StageMetadata> _metadataMap;
 
@@ -42,10 +42,10 @@ public class DistributedStagePlan {
     _metadataMap = new HashMap<>();
   }
 
-  public DistributedStagePlan(int stageId, ServerInstance serverInstance, 
StageNode stageRoot,
+  public DistributedStagePlan(int stageId, VirtualServer server, StageNode 
stageRoot,
       Map<Integer, StageMetadata> metadataMap) {
     _stageId = stageId;
-    _serverInstance = serverInstance;
+    _server = server;
     _stageRoot = stageRoot;
     _metadataMap = metadataMap;
   }
@@ -54,8 +54,8 @@ public class DistributedStagePlan {
     return _stageId;
   }
 
-  public ServerInstance getServerInstance() {
-    return _serverInstance;
+  public VirtualServer getServer() {
+    return _server;
   }
 
   public StageNode getStageRoot() {
@@ -66,8 +66,8 @@ public class DistributedStagePlan {
     return _metadataMap;
   }
 
-  public void setServerInstance(ServerInstance serverInstance) {
-    _serverInstance = serverInstance;
+  public void setServer(VirtualServer serverInstance) {
+    _server = serverInstance;
   }
 
   public void setStageRoot(StageNode stageRoot) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index d675efea62..e4d54a57a8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime.plan;
 
 import java.util.List;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
@@ -32,6 +31,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
@@ -62,10 +62,10 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
PlanRequestContext context) {
-    List<ServerInstance> sendingInstances = 
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
+    List<VirtualServer> sendingInstances = 
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
     MailboxReceiveOperator mailboxReceiveOperator =
         new MailboxReceiveOperator(context.getMailboxService(), 
sendingInstances,
-            node.getExchangeType(), context.getHostName(), context.getPort(),
+            node.getExchangeType(), context.getServer(),
             context.getRequestId(), node.getSenderStageId(), 
context.getTimeoutMs());
     context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
     return mailboxReceiveOperator;
@@ -77,7 +77,7 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
     StageMetadata receivingStageMetadata = 
context.getMetadataMap().get(node.getReceiverStageId());
     return new MailboxSendOperator(context.getMailboxService(), nextOperator,
         receivingStageMetadata.getServerInstances(), node.getExchangeType(), 
node.getPartitionKeySelector(),
-        context.getHostName(), context.getPort(), context.getRequestId(), 
node.getStageId());
+        context.getServer(), context.getRequestId(), node.getStageId());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index a52e0fa0be..db5b0e028a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
@@ -33,20 +34,18 @@ public class PlanRequestContext {
   protected final long _requestId;
   protected final int _stageId;
   private final long _timeoutMs;
-  protected final String _hostName;
-  protected final int _port;
+  protected final VirtualServerAddress _server;
   protected final Map<Integer, StageMetadata> _metadataMap;
   protected final List<MailboxIdentifier> _receivingMailboxes = new 
ArrayList<>();
 
 
   public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, 
long requestId, int stageId,
-      long timeoutMs, String hostName, int port, Map<Integer, StageMetadata> 
metadataMap) {
+      long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> 
metadataMap) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _timeoutMs = timeoutMs;
-    _hostName = hostName;
-    _port = port;
+    _server = server;
     _metadataMap = metadataMap;
   }
 
@@ -62,12 +61,8 @@ public class PlanRequestContext {
     return _timeoutMs;
   }
 
-  public String getHostName() {
-    return _hostName;
-  }
-
-  public int getPort() {
-    return _port;
+  public VirtualServerAddress getServer() {
+    return _server;
   }
 
   public Map<Integer, StageMetadata> getMetadataMap() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 071a9325bd..85440e1aed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -48,6 +48,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.service.QueryConfig;
@@ -104,8 +105,8 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     pinotQuery.setExplain(false);
     ServerPlanRequestContext context =
         new ServerPlanRequestContext(mailboxService, requestId, 
stagePlan.getStageId(), timeoutMs,
-            stagePlan.getServerInstance().getHostname(), 
stagePlan.getServerInstance().getPort(),
-            stagePlan.getMetadataMap(), pinotQuery, tableType, 
timeBoundaryInfo);
+            new VirtualServerAddress(stagePlan.getServer()), 
stagePlan.getMetadataMap(), pinotQuery, tableType,
+            timeBoundaryInfo);
 
     // visit the plan and create query physical plan.
     ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 776e9f4e22..918d459312 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AbstractStageNode;
 import org.apache.pinot.query.planner.stage.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 
@@ -43,7 +44,7 @@ public class QueryPlanSerDeUtils {
 
   public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
     DistributedStagePlan distributedStagePlan = new 
DistributedStagePlan(stagePlan.getStageId());
-    
distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId()));
+    
distributedStagePlan.setServer(stringToInstance(stagePlan.getInstanceId()));
     
distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
     Map<Integer, Worker.StageMetadata> metadataMap = 
stagePlan.getStageMetadataMap();
     
distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
@@ -53,19 +54,19 @@ public class QueryPlanSerDeUtils {
   public static Worker.StagePlan serialize(DistributedStagePlan 
distributedStagePlan) {
     return Worker.StagePlan.newBuilder()
         .setStageId(distributedStagePlan.getStageId())
-        
.setInstanceId(instanceToString(distributedStagePlan.getServerInstance()))
+        .setInstanceId(instanceToString(distributedStagePlan.getServer()))
         
.setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) 
distributedStagePlan.getStageRoot()))
         
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
   }
 
-  public static ServerInstance stringToInstance(String serverInstanceString) {
+  public static VirtualServer stringToInstance(String serverInstanceString) {
     String[] s = StringUtils.split(serverInstanceString, '_');
     // Skipped netty and grpc port as they are not used in worker instance.
-    return new WorkerInstance(s[0], Integer.parseInt(s[1]), 
Integer.parseInt(s[2]), Integer.parseInt(s[3]),
-        Integer.parseInt(s[4]));
+    return new VirtualServer(new WorkerInstance(s[0], Integer.parseInt(s[1]), 
Integer.parseInt(s[2]),
+        Integer.parseInt(s[3]), Integer.parseInt(s[4])), 0);
   }
 
-  public static String instanceToString(ServerInstance serverInstance) {
+  public static String instanceToString(VirtualServer serverInstance) {
     return StringUtils.join(serverInstance.getHostname(), '_', 
serverInstance.getPort(), '_',
         serverInstance.getGrpcPort(), '_', 
serverInstance.getQueryServicePort(), '_',
         serverInstance.getQueryMailboxPort());
@@ -94,7 +95,8 @@ public class QueryPlanSerDeUtils {
           : 
instanceEntry.getValue().getTableTypeToSegmentListMap().entrySet()) {
         tableToSegmentMap.put(tableEntry.getKey(), 
tableEntry.getValue().getSegmentsList());
       }
-      
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(instanceEntry.getKey()),
 tableToSegmentMap);
+      stageMetadata.getServerInstanceToSegmentsMap()
+          .put(stringToInstance(instanceEntry.getKey()).getServer(), 
tableToSegmentMap);
     }
     // time boundary info
     if (!workerStageMetadata.getTimeColumn().isEmpty()) {
@@ -117,7 +119,7 @@ public class QueryPlanSerDeUtils {
     // scanned table
     builder.addAllDataSources(stageMetadata.getScannedTables());
     // server instance to table-segments mapping
-    for (ServerInstance serverInstance : stageMetadata.getServerInstances()) {
+    for (VirtualServer serverInstance : stageMetadata.getServerInstances()) {
       builder.addInstances(instanceToString(serverInstance));
     }
     for (Map.Entry<ServerInstance, Map<String, List<String>>> instanceEntry
@@ -127,7 +129,7 @@ public class QueryPlanSerDeUtils {
         tableToSegmentMap.put(tableEntry.getKey(),
             
Worker.SegmentList.newBuilder().addAllSegments(tableEntry.getValue()).build());
       }
-      builder.putInstanceToSegmentMap(instanceToString(instanceEntry.getKey()),
+      builder.putInstanceToSegmentMap(instanceToString(new 
VirtualServer(instanceEntry.getKey(), 0)),
           
Worker.SegmentMap.newBuilder().putAllTableTypeToSegmentList(tableToSegmentMap).build());
     }
     // time boundary info
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 3ce5bd3551..35142fe0cf 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,6 +24,7 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.spi.config.table.TableType;
@@ -41,9 +42,9 @@ public class ServerPlanRequestContext extends 
PlanRequestContext {
   protected InstanceRequest _instanceRequest;
 
   public ServerPlanRequestContext(MailboxService<TransferableBlock> 
mailboxService, long requestId, int stageId,
-      long timeoutMs, String hostName, int port, Map<Integer, StageMetadata> 
metadataMap, PinotQuery pinotQuery,
+      long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> 
metadataMap, PinotQuery pinotQuery,
       TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
-    super(mailboxService, requestId, stageId, timeoutMs, hostName, port, 
metadataMap);
+    super(mailboxService, requestId, stageId, timeoutMs, server, metadataMap);
     _pinotQuery = pinotQuery;
     _tableType = tableType;
     _timeBoundaryInfo = timeBoundaryInfo;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index f0eb564327..1000f42869 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -35,11 +35,12 @@ import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
@@ -70,8 +71,8 @@ public class QueryDispatcher {
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
     MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
         
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
 requestId,
-        reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
-        mailboxService.getMailboxPort(), timeoutMs);
+        reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
+        new VirtualServerAddress(mailboxService.getHostname(), 
mailboxService.getMailboxPort(), 0), timeoutMs);
     List<DataBlock> resultDataBlocks = 
reduceMailboxReceive(mailboxReceiveOperator, timeoutMs);
     mailboxReceiveOperator.toExplainString();
     long toResultTableStartTime = System.currentTimeMillis();
@@ -92,8 +93,8 @@ public class QueryDispatcher {
       if (queryPlan.getQueryStageMap().get(stageId) instanceof 
MailboxReceiveNode) {
         reduceStageId = stageId;
       } else {
-        List<ServerInstance> serverInstances = 
stage.getValue().getServerInstances();
-        for (ServerInstance serverInstance : serverInstances) {
+        List<VirtualServer> serverInstances = 
stage.getValue().getServerInstances();
+        for (VirtualServer serverInstance : serverInstances) {
           String host = serverInstance.getHostname();
           int servicePort = serverInstance.getQueryServicePort();
           DispatchClient client = getOrCreateDispatchClient(host, servicePort);
@@ -120,7 +121,7 @@ public class QueryDispatcher {
   }
 
   public static DistributedStagePlan constructDistributedStagePlan(QueryPlan 
queryPlan, int stageId,
-      ServerInstance serverInstance) {
+      VirtualServer serverInstance) {
     return new DistributedStagePlan(stageId, serverInstance, 
queryPlan.getQueryStageMap().get(stageId),
         queryPlan.getStageMetadataMap());
   }
@@ -195,12 +196,12 @@ public class QueryDispatcher {
 
   @VisibleForTesting
   public static MailboxReceiveOperator 
createReduceStageOperator(MailboxService<TransferableBlock> mailboxService,
-      List<ServerInstance> sendingInstances, long jobId, int stageId, 
DataSchema dataSchema, String hostname,
-      int port, long timeoutMs) {
+      List<VirtualServer> sendingInstances, long jobId, int stageId, 
DataSchema dataSchema, VirtualServerAddress server,
+      long timeoutMs) {
     // timeout is set for reduce stage
     MailboxReceiveOperator mailboxReceiveOperator =
         new MailboxReceiveOperator(mailboxService, sendingInstances,
-            RelDistribution.Type.RANDOM_DISTRIBUTED, hostname, port, jobId, 
stageId, timeoutMs);
+            RelDistribution.Type.RANDOM_DISTRIBUTED, server, jobId, stageId, 
timeoutMs);
     return mailboxReceiveOperator;
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 70b7763ca9..4e04d5c899 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.testutils.QueryTestUtils;
@@ -76,8 +77,8 @@ public class GrpcMailboxServiceTest {
     // Given:
     JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
         "happypath",
-        new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
-        new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
+        new VirtualServerAddress("localhost", 
_mailboxService1.getMailboxPort(), 0),
+        new VirtualServerAddress("localhost", 
_mailboxService2.getMailboxPort(), 0));
     SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
@@ -108,8 +109,8 @@ public class GrpcMailboxServiceTest {
     // Given:
     JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
         "exception",
-        new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
-        new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
+        new VirtualServerAddress("localhost", 
_mailboxService1.getMailboxPort(), 0),
+        new VirtualServerAddress("localhost", 
_mailboxService2.getMailboxPort(), 0));
     SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index a1eae8e1ff..0b7438b330 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -39,7 +40,7 @@ public class InMemoryMailboxServiceTest {
       throws Exception {
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
     final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
-        "happyPathJob", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 0));
+        "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 0, 0));
     InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) 
mailboxService.getReceivingMailbox(
         mailboxId);
     InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox) 
mailboxService.getSendingMailbox(mailboxId);
@@ -75,7 +76,7 @@ public class InMemoryMailboxServiceTest {
   public void testNonLocalMailboxId() {
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
     final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
-        "happyPathJob", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 1));
+        "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 1, 0));
 
     // Test getReceivingMailbox
     try {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 110aeb9d74..66ed5be7d9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -25,9 +26,9 @@ import org.testng.annotations.Test;
 
 public class MultiplexingMailboxServiceTest {
   private static final JsonMailboxIdentifier LOCAL_MAILBOX_ID = new 
JsonMailboxIdentifier(
-      "localJobId", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 0));
+      "localJobId", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 0, 0));
   private static final JsonMailboxIdentifier NON_LOCAL_MAILBOX_ID = new 
JsonMailboxIdentifier(
-      "localJobId", new ServerAddress("localhost", 0), new 
ServerAddress("localhost", 1));
+      "localJobId", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 1, 0));
 
   @Test
   public void testHappyPath() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index abb7d65e4d..093fffa381 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -28,12 +28,13 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -189,13 +190,14 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         mailboxReceiveOperator = 
QueryDispatcher.createReduceStageOperator(_mailboxService,
             
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
             
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)),
-            reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
"localhost", _reducerGrpcPort,
+            reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
+            new VirtualServerAddress("localhost", _reducerGrpcPort, 0),
             
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)));
       } else {
-        for (ServerInstance serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+        for (VirtualServer serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
           DistributedStagePlan distributedStagePlan =
               QueryDispatcher.constructDistributedStagePlan(queryPlan, 
stageId, serverInstance);
-          _servers.get(serverInstance).processQuery(distributedStagePlan, 
requestMetadataMap);
+          
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan, 
requestMetadataMap);
         }
       }
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 090f4ded6f..b921c8fdd8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -47,6 +47,8 @@ import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.service.QueryConfig;
@@ -92,13 +94,13 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
         mailboxReceiveOperator = 
QueryDispatcher.createReduceStageOperator(_mailboxService,
             
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
             
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)), 
reduceNode.getSenderStageId(),
-            reduceNode.getDataSchema(), "localhost", _reducerGrpcPort,
+            reduceNode.getDataSchema(), new VirtualServerAddress("localhost", 
_reducerGrpcPort, 0),
             
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)));
       } else {
-        for (ServerInstance serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+        for (VirtualServer serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
           DistributedStagePlan distributedStagePlan =
               QueryDispatcher.constructDistributedStagePlan(queryPlan, 
stageId, serverInstance);
-          _servers.get(serverInstance).processQuery(distributedStagePlan, 
requestMetadataMap);
+          
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan, 
requestMetadataMap);
         }
       }
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 0435b50daa..36bee05aae 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
 
 public class RoundRobinSchedulerTest {
 
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1_1", "foo:2", "bar:3");
-  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1_2", "foo:2", "bar:3");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1_1", "0@foo:2", "0@bar:3");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1_2", "0@foo:2", "0@bar:3");
 
   @Mock
   private MultiStageOperator _operator;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index ce6777df14..b1c6b5cc79 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -25,11 +25,11 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.ServerAddress;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.mockito.Mock;
@@ -56,9 +56,11 @@ public class MailboxReceiveOperatorTest {
   @Mock
   private MailboxService<TransferableBlock> _mailboxService;
   @Mock
-  private ServerInstance _server1;
+  private VirtualServer _server1;
   @Mock
-  private ServerInstance _server2;
+  private VirtualServer _server2;
+
+  private final VirtualServerAddress _testAddr = new 
VirtualServerAddress("test", 123, 0);
 
   @BeforeMethod
   public void setUp() {
@@ -76,7 +78,7 @@ public class MailboxReceiveOperatorTest {
       throws InterruptedException {
     // shorter timeoutMs should result in error.
     MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, "test", 123, 456,
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, _testAddr, 456,
             789, 10L);
     Thread.sleep(200L);
     TransferableBlock mailbox = receiveOp.nextBlock();
@@ -86,13 +88,13 @@ public class MailboxReceiveOperatorTest {
 
     // longer timeout or default timeout (10s) doesn't result in error.
     receiveOp =
-        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, "test", 123, 456,
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, _testAddr, 456,
             789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     receiveOp =
-        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, "test", 123, 456,
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), 
RelDistribution.Type.SINGLETON, _testAddr, 456,
             789, null);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
@@ -113,7 +115,7 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+        RelDistribution.Type.SINGLETON, _testAddr, 456, 789, null);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -128,7 +130,7 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+        RelDistribution.Type.RANGE_DISTRIBUTED, _testAddr, 456, 789, null);
   }
 
   @Test
@@ -150,9 +152,10 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
 
     // Receive end of stream block directly when there is no match.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -177,15 +180,16 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(serverHost, server2port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(serverHost, server2port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
     // Receive end of stream block directly when mailbox is close.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
   }
@@ -210,17 +214,18 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(serverHost, server2port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(serverHost, server2port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     // Receive null mailbox during timeout.
     Mockito.when(_mailbox.receive()).thenReturn(null);
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
     // Receive NoOpBlock.
     Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
   }
@@ -245,16 +250,17 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(serverHost, server2port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(serverHost, server2port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
     // Receive EosBloc.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
   }
@@ -279,18 +285,19 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(serverHost, server2port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(serverHost, server2port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     
Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
@@ -317,17 +324,18 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(serverHost, server2port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(serverHost, server2port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Exception e = new Exception("errorBlock");
     
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+        RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
     MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -351,25 +359,26 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId1 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server1Host, server1Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server1Host, server1Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
 
     JsonMailboxIdentifier expectedMailboxId2 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server2Host, server2Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server2Host, server2Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, 
null);
+        RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, 
null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
@@ -393,26 +402,27 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     JsonMailboxIdentifier expectedMailboxId1 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server1Host, server1Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server1Host, server1Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenReturn(null);
 
     JsonMailboxIdentifier expectedMailboxId2 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server2Host, server2Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server2Host, server2Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, 
null);
+        RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, 
null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
@@ -436,12 +446,13 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     JsonMailboxIdentifier expectedMailboxId1 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server1Host, server1Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server1Host, server1Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Object[] expRow1 = new Object[]{1, 1};
@@ -453,13 +464,13 @@ public class MailboxReceiveOperatorTest {
     Object[] expRow3 = new Object[]{3, 3};
     JsonMailboxIdentifier expectedMailboxId2 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server2Host, server2Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server2Host, server2Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, 
null);
+        RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, 
null);
     // Receive first block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
@@ -495,12 +506,13 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     JsonMailboxIdentifier expectedMailboxId1 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server1Host, server1Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server1Host, server1Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive())
@@ -509,13 +521,13 @@ public class MailboxReceiveOperatorTest {
     Object[] expRow3 = new Object[]{3, 3};
     JsonMailboxIdentifier expectedMailboxId2 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server2Host, server2Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server2Host, server2Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, 
null);
+        RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, 
null);
     // Receive error block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -540,12 +552,13 @@ public class MailboxReceiveOperatorTest {
     int stageId = 0;
     int toPort = 8888;
     String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 
0);
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new 
DataSchema.ColumnDataType[]{INT, INT});
     JsonMailboxIdentifier expectedMailboxId1 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server1Host, server1Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server1Host, server1Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenThrow(new Exception("mailboxError"));
@@ -553,13 +566,13 @@ public class MailboxReceiveOperatorTest {
     Object[] expRow3 = new Object[]{3, 3};
     JsonMailboxIdentifier expectedMailboxId2 =
         new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-            new ServerAddress(server2Host, server2Port),
-            new ServerAddress(toHost, toPort));
+            new VirtualServerAddress(server2Host, server2Port, 0),
+            toAddress);
     
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, 
expRow3));
     MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, 
null);
+        RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, 
null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have 
returned an error-block");
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index b616efcf91..4bc112d933 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -48,7 +48,7 @@ public class MailboxSendOperatorTest {
   @Mock
   private MailboxService<TransferableBlock> _mailboxService;
   @Mock
-  private ServerInstance _server;
+  private VirtualServer _server;
   @Mock
   private KeySelector<Object[], Object[]> _selector;
   @Mock
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
     Mockito.when(_input.nextBlock())
         .thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenThrow(new RuntimeException("foo!"));
     ArgumentCaptor<TransferableBlock> captor = 
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock eosBlock = 
TransferableBlockUtils.getEndOfStreamTransferableBlock();
     Mockito.when(_input.nextBlock())
         .thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new JsonMailboxIdentifier("123", "from:1", "to:2"), 
_exchangeFactory, 1, 2);
+        server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
     Mockito.when(_input.nextBlock())
         .thenReturn(dataBlock)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index d2cb71a04c..43663c5e0a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -44,9 +44,8 @@ import org.testng.annotations.Test;
 
 
 public class BlockExchangeTest {
-
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index c99ea643c6..eccd31fe6a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
 
 
 public class BroadcastExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 4cd1d71075..31ff4a70f3 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -37,9 +37,8 @@ import org.testng.annotations.Test;
 
 
 public class HashExchangeTest {
-
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index 19e3db1711..c93080824c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -33,8 +33,8 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class RandomExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "host:1", "host:2");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+  private static final MailboxIdentifier MAILBOX_2 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index e38e533a9a..6b9dcd53f1 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -34,7 +34,7 @@ import org.testng.annotations.Test;
 
 
 public class SingletonExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "host:1", "host:1");
+  private static final MailboxIdentifier MAILBOX_1 = new 
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index f7c7f06f0f..5473b4703d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -166,7 +167,7 @@ public class QueryServerTest extends QueryTestSet {
   }
 
   private Worker.QueryRequest getQueryRequest(QueryPlan queryPlan, int 
stageId) {
-    ServerInstance serverInstance = 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances().get(0);
+    VirtualServer serverInstance = 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances().get(0);
 
     return 
Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
             QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to