This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9105a06ce8 [multistage] use virtual servers instead of ServerInstance
(#10157)
9105a06ce8 is described below
commit 9105a06ce8d7c0e1b0988c6883f620801d07bbea
Author: Almog Gavra <[email protected]>
AuthorDate: Tue Jan 24 17:36:50 2023 -0800
[multistage] use virtual servers instead of ServerInstance (#10157)
---
.../query/planner/ExplainPlanStageVisitor.java | 21 ++--
.../apache/pinot/query/planner/StageMetadata.java | 11 +-
.../colocated/GreedyShuffleRewriteVisitor.java | 8 +-
.../apache/pinot/query/routing/VirtualServer.java | 92 +++++++++++++++++
.../pinot/query/routing/VirtualServerAddress.java | 43 ++++++--
.../apache/pinot/query/routing/WorkerManager.java | 16 ++-
.../apache/pinot/query/QueryCompilationTest.java | 18 ++--
.../pinot/query/mailbox/JsonMailboxIdentifier.java | 19 ++--
.../pinot/query/mailbox/MailboxIdentifier.java | 7 +-
.../java/org/apache/pinot/query/mailbox/Utils.java | 5 +-
.../apache/pinot/query/runtime/QueryRunner.java | 18 ++--
.../runtime/operator/MailboxReceiveOperator.java | 26 ++---
.../runtime/operator/MailboxSendOperator.java | 26 ++---
.../query/runtime/plan/DistributedStagePlan.java | 16 +--
.../query/runtime/plan/PhysicalPlanVisitor.java | 8 +-
.../query/runtime/plan/PlanRequestContext.java | 17 ++--
.../runtime/plan/ServerRequestPlanVisitor.java | 5 +-
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 20 ++--
.../plan/server/ServerPlanRequestContext.java | 5 +-
.../pinot/query/service/QueryDispatcher.java | 19 ++--
.../query/mailbox/GrpcMailboxServiceTest.java | 9 +-
.../query/mailbox/InMemoryMailboxServiceTest.java | 5 +-
.../mailbox/MultiplexingMailboxServiceTest.java | 5 +-
.../pinot/query/runtime/QueryRunnerTest.java | 10 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 8 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 4 +-
.../operator/MailboxReceiveOperatorTest.java | 113 ++++++++++++---------
.../runtime/operator/MailboxSendOperatorTest.java | 14 +--
.../operator/exchange/BlockExchangeTest.java | 5 +-
.../operator/exchange/BroadcastExchangeTest.java | 4 +-
.../operator/exchange/HashExchangeTest.java | 5 +-
.../operator/exchange/RandomExchangeTest.java | 4 +-
.../operator/exchange/SingletonExchangeTest.java | 2 +-
.../pinot/query/service/QueryServerTest.java | 3 +-
34 files changed, 374 insertions(+), 217 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
index e0345590dc..ccc71858cd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -34,6 +34,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.StageNodeVisitor;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
/**
@@ -59,7 +60,7 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
}
// the root of a query plan always only has a single node
- ServerInstance rootServer =
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
+ VirtualServer rootServer =
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0),
rootServer);
}
@@ -75,7 +76,7 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
*
* @return a query plan associated with
*/
- public static String explainFrom(QueryPlan queryPlan, StageNode node,
ServerInstance rootServer) {
+ public static String explainFrom(QueryPlan queryPlan, StageNode node,
VirtualServer rootServer) {
final ExplainPlanStageVisitor visitor = new
ExplainPlanStageVisitor(queryPlan);
return node
.visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
@@ -133,9 +134,9 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
StageMetadata metadata =
_queryPlan.getStageMetadataMap().get(senderStageId);
Map<ServerInstance, Map<String, List<String>>> segments =
metadata.getServerInstanceToSegmentsMap();
- Iterator<ServerInstance> iterator =
metadata.getServerInstances().iterator();
+ Iterator<VirtualServer> iterator =
metadata.getServerInstances().iterator();
while (iterator.hasNext()) {
- ServerInstance serverInstance = iterator.next();
+ VirtualServer serverInstance = iterator.next();
if (segments.containsKey(serverInstance)) {
// always print out leaf stages
sender.visit(this, context.next(iterator.hasNext(), serverInstance));
@@ -164,10 +165,10 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
appendInfo(node, context);
int receiverStageId = node.getReceiverStageId();
- List<ServerInstance> servers =
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
+ List<VirtualServer> servers =
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
context._builder.append("->");
String receivers = servers.stream()
- .map(s -> s.getHostname() + ':' + s.getPort())
+ .map(VirtualServer::toString)
.map(s -> "[" + receiverStageId + "]@" + s)
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
@@ -190,7 +191,7 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
.append(_queryPlan.getStageMetadataMap()
.get(node.getStageId())
.getServerInstanceToSegmentsMap()
- .get(context._host))
+ .get(context._host.getServer()))
.append('\n');
}
@@ -200,19 +201,19 @@ public class ExplainPlanStageVisitor implements
StageNodeVisitor<StringBuilder,
}
static class Context {
- final ServerInstance _host;
+ final VirtualServer _host;
final String _prefix;
final String _childPrefix;
final StringBuilder _builder;
- Context(ServerInstance host, String prefix, String childPrefix,
StringBuilder builder) {
+ Context(VirtualServer host, String prefix, String childPrefix,
StringBuilder builder) {
_host = host;
_prefix = prefix;
_childPrefix = childPrefix;
_builder = builder;
}
- Context next(boolean hasMoreChildren, ServerInstance host) {
+ Context next(boolean hasMoreChildren, VirtualServer host) {
return new Context(
host,
hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index 2f21a64c27..225599e098 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.routing.VirtualServer;
/**
@@ -43,9 +44,11 @@ public class StageMetadata implements Serializable {
private List<String> _scannedTables;
// used for assigning server/worker nodes.
- private List<ServerInstance> _serverInstances;
+ private List<VirtualServer> _serverInstances;
- // used for table scan stage.
+ // used for table scan stage - we use ServerInstance instead of VirtualServer
+ // here because all virtual servers that share a server instance will have
the
+ // same segments on them
private Map<ServerInstance, Map<String, List<String>>>
_serverInstanceToSegmentsMap;
// time boundary info
@@ -82,11 +85,11 @@ public class StageMetadata implements Serializable {
_serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
}
- public List<ServerInstance> getServerInstances() {
+ public List<VirtualServer> getServerInstances() {
return _serverInstances;
}
- public void setServerInstances(List<ServerInstance> serverInstances) {
+ public void setServerInstances(List<VirtualServer> serverInstances) {
_serverInstances = serverInstances;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 95eff8ac91..b37e250f14 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -45,6 +44,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.StageNodeVisitor;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -298,9 +298,9 @@ public class GreedyShuffleRewriteVisitor
* 2. Servers assigned to the join-stage are a superset of S.
*/
private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int
leftStageId, int rightStageId) {
- Set<ServerInstance> leftServerInstances = new
HashSet<>(_stageMetadataMap.get(leftStageId).getServerInstances());
- List<ServerInstance> rightServerInstances =
_stageMetadataMap.get(rightStageId).getServerInstances();
- List<ServerInstance> currentServerInstances =
_stageMetadataMap.get(currentStageId).getServerInstances();
+ Set<VirtualServer> leftServerInstances = new
HashSet<>(_stageMetadataMap.get(leftStageId).getServerInstances());
+ List<VirtualServer> rightServerInstances =
_stageMetadataMap.get(rightStageId).getServerInstances();
+ List<VirtualServer> currentServerInstances =
_stageMetadataMap.get(currentStageId).getServerInstances();
return leftServerInstances.containsAll(rightServerInstances)
&& leftServerInstances.size() == rightServerInstances.size()
&& currentServerInstances.containsAll(leftServerInstances);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
new file mode 100644
index 0000000000..c50aee41e7
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.routing;
+
+import java.util.Objects;
+import org.apache.pinot.core.transport.ServerInstance;
+
+
+/**
+ * {@code VirtualServer} is a {@link ServerInstance} associated with a
+ * unique virtualization identifier which allows the multistage query
+ * engine to collocate multiple virtual servers on a single physical
+ * instance, enabling higher levels of parallelism and partitioning
+ * the query input.
+ */
+public class VirtualServer {
+
+ private final ServerInstance _server;
+ private final int _virtualId;
+
+ public VirtualServer(ServerInstance server, int virtualId) {
+ _server = server;
+ _virtualId = virtualId;
+ }
+
+ public ServerInstance getServer() {
+ return _server;
+ }
+
+ public int getVirtualId() {
+ return _virtualId;
+ }
+
+ public String getHostname() {
+ return _server.getHostname();
+ }
+
+ public int getPort() {
+ return _server.getPort();
+ }
+
+ public int getQueryMailboxPort() {
+ return _server.getQueryMailboxPort();
+ }
+
+ public int getQueryServicePort() {
+ return _server.getQueryServicePort();
+ }
+
+ public int getGrpcPort() {
+ return _server.getGrpcPort();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ VirtualServer that = (VirtualServer) o;
+ return _virtualId == that._virtualId && Objects.equals(_server,
that._server);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_server, _virtualId);
+ }
+
+ @Override
+ public String toString() {
+ return _virtualId + "@" + _server.getInstanceId();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
similarity index 53%
rename from
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
rename to
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
index cda2b332fe..99074f7191 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/VirtualServerAddress.java
@@ -17,19 +17,35 @@
* under the License.
*/
-package org.apache.pinot.query.mailbox;
+package org.apache.pinot.query.routing;
import java.util.Objects;
-public class ServerAddress {
+/**
+ * Represents the address of a {@link VirtualServer} containing
+ * both the ID of the specific virtualized server and the physical
+ * internet address in id@hostname:port format.
+ *
+ * <p>This is needed in addition to {@code VirtualServer} because there
+ * are some parts of the code that don't have enough information to
+ * construct the full {@code VirtualServer} and only require the
+ * hostname, port and virtualId.</p>
+ */
+public class VirtualServerAddress {
private final String _hostname;
private final int _port;
+ private final int _virtualId;
- public ServerAddress(String hostname, int port) {
+ public VirtualServerAddress(String hostname, int port, int virtualId) {
_hostname = hostname;
_port = port;
+ _virtualId = virtualId;
+ }
+
+ public VirtualServerAddress(VirtualServer server) {
+ this(server.getHostname(), server.getQueryMailboxPort(),
server.getVirtualId());
}
/**
@@ -39,9 +55,10 @@ public class ServerAddress {
* @param address the serialized string
* @return the deserialized form
*/
- public static ServerAddress parse(String address) {
- String[] split = address.split(":");
- return new ServerAddress(split[0], Integer.parseInt(split[1]));
+ public static VirtualServerAddress parse(String address) {
+ String[] split = address.split("@");
+ String[] hostSplit = split[1].split(":");
+ return new VirtualServerAddress(hostSplit[0],
Integer.parseInt(hostSplit[1]), Integer.parseInt(split[0]));
}
/**
@@ -58,6 +75,10 @@ public class ServerAddress {
return _port;
}
+ public int virtualId() {
+ return _virtualId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -66,17 +87,19 @@ public class ServerAddress {
if (o == null || getClass() != o.getClass()) {
return false;
}
- ServerAddress that = (ServerAddress) o;
- return _port == that._port && Objects.equals(_hostname, that._hostname);
+ VirtualServerAddress that = (VirtualServerAddress) o;
+ return _port == that._port
+ && _virtualId == that._virtualId
+ && Objects.equals(_hostname, that._hostname);
}
@Override
public int hashCode() {
- return Objects.hash(_hostname, _port);
+ return Objects.hash(_hostname, _port, _virtualId);
}
@Override
public String toString() {
- return _hostname + ":" + _port;
+ return _virtualId + "@" + _hostname + ":" + _port;
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 36c0f2bd81..0e3750dc5d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -93,26 +94,31 @@ public class WorkerManager {
"Entry for server {} and table type: {} already exist!",
serverEntry.getKey(), tableType);
}
}
- stageMetadata.setServerInstances(new
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
+ stageMetadata.setServerInstances(new ArrayList<>(
+ serverInstanceToSegmentsMap.keySet()
+ .stream()
+ .map(server -> new VirtualServer(server, 0)) // for now, only
use single virtual server
+ .collect(Collectors.toList())));
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
} else if (PlannerUtils.isRootStage(stageId)) {
// ROOT stage doesn't have a QueryServer as it is strictly only reducing
results.
// here we simply assign the worker instance with identical
server/mailbox port number.
- stageMetadata.setServerInstances(Lists.newArrayList(new
WorkerInstance(_hostName, _port, _port, _port, _port)));
+ stageMetadata.setServerInstances(Lists.newArrayList(
+ new VirtualServer(new WorkerInstance(_hostName, _port, _port, _port,
_port), 0)));
} else {
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
}
}
- private static List<ServerInstance> filterServers(Collection<ServerInstance>
servers) {
- List<ServerInstance> serverInstances = new ArrayList<>();
+ private static List<VirtualServer> filterServers(Collection<ServerInstance>
servers) {
+ List<VirtualServer> serverInstances = new ArrayList<>();
for (ServerInstance server : servers) {
String hostname = server.getHostname();
if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
&&
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
- serverInstances.add(server);
+ serverInstances.add(new VirtualServer(server, 0));
}
}
return serverInstances;
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 845533485a..fed57b3406 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
@@ -39,6 +38,7 @@ import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -119,19 +119,19 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
if (tables.size() == 1) {
// table scan stages; for tableA it should have 2 hosts, for tableB it
should have only 1
Assert.assertEquals(
-
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
- tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_2",
"Server_localhost_1")
- : ImmutableList.of("Server_localhost_1"));
+
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toList()),
+ tables.get(0).equals("a") ?
ImmutableList.of("0@Server_localhost_2", "0@Server_localhost_1")
+ : ImmutableList.of("0@Server_localhost_1"));
} else if (!PlannerUtils.isRootStage(e.getKey())) {
// join stage should have both servers used.
Assert.assertEquals(
-
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
- ImmutableSet.of("Server_localhost_1", "Server_localhost_2"));
+
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
+ ImmutableSet.of("0@Server_localhost_1", "0@Server_localhost_2"));
} else {
// reduce stage should have the reducer instance.
Assert.assertEquals(
-
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toSet()),
- ImmutableSet.of("Server_localhost_3"));
+
e.getValue().getServerInstances().stream().map(VirtualServer::toString).collect(Collectors.toSet()),
+ ImmutableSet.of("0@Server_localhost_3"));
}
}
}
@@ -165,7 +165,7 @@ public class QueryCompilationTest extends
QueryEnvironmentTestBase {
.filter(stageMetadata -> stageMetadata.getScannedTables().size() !=
0).collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(),
1);
-
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"Server_localhost_2");
+
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
"0@Server_localhost_2");
query = "SELECT * FROM d";
queryPlan = _queryEnvironment.planQuery(query);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
index 7068b06ab6..4f0b6284ad 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
@@ -20,12 +20,15 @@ package org.apache.pinot.query.mailbox;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Objects;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JsonMailboxIdentifier implements MailboxIdentifier {
private static final ObjectMapper MAPPER = new ObjectMapper();
@@ -34,8 +37,8 @@ public class JsonMailboxIdentifier implements
MailboxIdentifier {
private final String _from;
private final String _to;
- private final ServerAddress _fromAddress;
- private final ServerAddress _toAddress;
+ private final VirtualServerAddress _fromAddress;
+ private final VirtualServerAddress _toAddress;
@JsonCreator
public JsonMailboxIdentifier(
@@ -46,14 +49,14 @@ public class JsonMailboxIdentifier implements
MailboxIdentifier {
_jobId = jobId;
_from = from;
_to = to;
- _fromAddress = ServerAddress.parse(_from);
- _toAddress = ServerAddress.parse(_to);
+ _fromAddress = VirtualServerAddress.parse(_from);
+ _toAddress = VirtualServerAddress.parse(_to);
}
public JsonMailboxIdentifier(
String jobId,
- ServerAddress from,
- ServerAddress to
+ VirtualServerAddress from,
+ VirtualServerAddress to
) {
_jobId = jobId;
_from = from.toString();
@@ -83,7 +86,7 @@ public class JsonMailboxIdentifier implements
MailboxIdentifier {
@JsonIgnore
@Override
- public ServerAddress getFromHost() {
+ public VirtualServerAddress getFromHost() {
return _fromAddress;
}
@@ -93,7 +96,7 @@ public class JsonMailboxIdentifier implements
MailboxIdentifier {
@JsonIgnore
@Override
- public ServerAddress getToHost() {
+ public VirtualServerAddress getToHost() {
return _toAddress;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index 77647d3db2..d1d1b44f35 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.query.mailbox;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+
+
/**
* {@link MailboxIdentifier} uniquely identify the mailbox that pairs a sender
and a receiver.
*
@@ -34,12 +37,12 @@ public interface MailboxIdentifier {
/**
* @return the sender address
*/
- ServerAddress getFromHost();
+ VirtualServerAddress getFromHost();
/**
* @return the destination address
*/
- ServerAddress getToHost();
+ VirtualServerAddress getToHost();
/**
* Checks whether sender and receiver are in the same JVM.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
index 60ae614a2f..8aedbb1a1f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.query.mailbox;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+
+
public final class Utils {
private Utils() {
@@ -26,7 +29,7 @@ public final class Utils {
public static String constructChannelId(String mailboxId) {
MailboxIdentifier mailboxIdentifier = toMailboxIdentifier(mailboxId);
- ServerAddress dest = mailboxIdentifier.getToHost();
+ VirtualServerAddress dest = mailboxIdentifier.getToHost();
return dest.toString();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index ad0d138bb4..c2826e5851 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -38,12 +38,13 @@ import
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
@@ -82,6 +83,7 @@ public class QueryRunner {
private MailboxService<TransferableBlock> _mailboxService;
private String _hostname;
private int _port;
+ private VirtualServerAddress _rootServer;
private OpChainSchedulerService _scheduler;
/**
@@ -94,6 +96,8 @@ public class QueryRunner {
_hostname =
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ?
instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT,
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+ // always use 0 for root server ID as all data is processed by one node at
the global root
+ _rootServer = new VirtualServerAddress(_hostname, _port, 0);
_helixManager = helixManager;
try {
long releaseMs =
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
@@ -150,7 +154,7 @@ public class QueryRunner {
MailboxSendOperator mailboxSendOperator = new
MailboxSendOperator(_mailboxService,
new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema(), requestId,
sendNode.getStageId()),
receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequests.get(0).getRequestId(),
+ sendNode.getPartitionKeySelector(), _rootServer,
serverQueryRequests.get(0).getRequestId(),
sendNode.getStageId());
int blockCounter = 0;
while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
@@ -161,8 +165,8 @@ public class QueryRunner {
long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
StageNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
- new PlanRequestContext(_mailboxService, requestId,
stageRoot.getStageId(), timeoutMs, _hostname, _port,
- distributedStagePlan.getMetadataMap()));
+ new PlanRequestContext(_mailboxService, requestId,
stageRoot.getStageId(), timeoutMs,
+ new VirtualServerAddress(distributedStagePlan.getServer()),
distributedStagePlan.getMetadataMap()));
_scheduler.register(rootOperator);
}
}
@@ -175,7 +179,7 @@ public class QueryRunner {
"Server request for V2 engine should only have 1 scan table per
request.");
String rawTableName = stageMetadata.getScannedTables().get(0);
Map<String, List<String>> tableToSegmentListMap =
-
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
+
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServer().getServer());
List<ServerPlanRequestContext> requests = new ArrayList<>();
for (Map.Entry<String, List<String>> tableEntry :
tableToSegmentListMap.entrySet()) {
String tableType = tableEntry.getKey();
@@ -219,9 +223,9 @@ public class QueryRunner {
private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
int stageId = distributedStagePlan.getStageId();
- ServerInstance serverInstance = distributedStagePlan.getServerInstance();
+ VirtualServer serverInstance = distributedStagePlan.getServer();
StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(stageId);
- Map<String, List<String>> segments =
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
+ Map<String, List<String>> segments =
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance.getServer());
return segments != null && segments.size() > 0;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 8fb1d145c1..ad0dcdbde5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -29,12 +29,12 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.ServerAddress;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.service.QueryConfig;
@@ -68,18 +68,18 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
private TransferableBlock _upstreamErrorBlock;
private OperatorStats _operatorStats;
- private static MailboxIdentifier toMailboxId(ServerInstance fromInstance,
long jobId, long stageId,
- String receiveHostName, int receivePort) {
+ private static MailboxIdentifier toMailboxId(VirtualServer sender, long
jobId, long stageId,
+ VirtualServerAddress receiver) {
return new JsonMailboxIdentifier(
String.format("%s_%s", jobId, stageId),
- new ServerAddress(fromInstance.getHostname(),
fromInstance.getQueryMailboxPort()),
- new ServerAddress(receiveHostName, receivePort));
+ new VirtualServerAddress(sender),
+ receiver);
}
// TODO: Move deadlineInNanoSeconds to OperatorContext.
public MailboxReceiveOperator(MailboxService<TransferableBlock>
mailboxService,
- List<ServerInstance> sendingStageInstances, RelDistribution.Type
exchangeType, String receiveHostName,
- int receivePort, long jobId, int stageId, Long timeoutMs) {
+ List<VirtualServer> sendingStageInstances, RelDistribution.Type
exchangeType, VirtualServerAddress receiver,
+ long jobId, int stageId, Long timeoutMs) {
_mailboxService = mailboxService;
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
"Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -88,8 +88,8 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
_exchangeType = exchangeType;
if (_exchangeType == RelDistribution.Type.SINGLETON) {
- ServerInstance singletonInstance = null;
- for (ServerInstance serverInstance : sendingStageInstances) {
+ VirtualServer singletonInstance = null;
+ for (VirtualServer serverInstance : sendingStageInstances) {
if (serverInstance.getHostname().equals(_mailboxService.getHostname())
&& serverInstance.getQueryMailboxPort() ==
_mailboxService.getMailboxPort()) {
Preconditions.checkState(singletonInstance == null, "multiple
instance found for singleton exchange type!");
@@ -103,12 +103,12 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
_sendingMailbox = Collections.emptyList();
} else {
_sendingMailbox =
- Collections.singletonList(toMailboxId(singletonInstance, jobId,
stageId, receiveHostName, receivePort));
+ Collections.singletonList(toMailboxId(singletonInstance, jobId,
stageId, receiver));
}
} else {
_sendingMailbox = new ArrayList<>(sendingStageInstances.size());
- for (ServerInstance instance : sendingStageInstances) {
- _sendingMailbox.add(toMailboxId(instance, jobId, stageId,
receiveHostName, receivePort));
+ for (VirtualServer instance : sendingStageInstances) {
+ _sendingMailbox.add(toMailboxId(instance, jobId, stageId, receiver));
}
}
_upstreamErrorBlock = null;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 047f5fd5b2..32ac1247c5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,12 +28,12 @@ import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.ServerAddress;
import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -65,20 +65,20 @@ public class MailboxSendOperator extends MultiStageOperator
{
@VisibleForTesting
interface MailboxIdGenerator {
- MailboxIdentifier generate(ServerInstance server);
+ MailboxIdentifier generate(VirtualServer server);
}
public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
- MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance>
receivingStageInstances,
- RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector, String hostName, int port,
- long jobId, int stageId) {
+ MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer>
receivingStageInstances,
+ RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector,
+ VirtualServerAddress sendingServer, long jobId, int stageId) {
this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances,
exchangeType, keySelector,
- server -> toMailboxId(server, jobId, stageId, hostName, port),
BlockExchange::getExchange, jobId, stageId);
+ server -> toMailboxId(server, jobId, stageId, sendingServer),
BlockExchange::getExchange, jobId, stageId);
}
@VisibleForTesting
MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
- MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance>
receivingStageInstances,
+ MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer>
receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector,
MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory
blockExchangeFactory, long jobId, int stageId) {
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -86,8 +86,8 @@ public class MailboxSendOperator extends MultiStageOperator {
List<MailboxIdentifier> receivingMailboxes;
if (exchangeType == RelDistribution.Type.SINGLETON) {
// TODO: this logic should be moved into SingletonExchange
- ServerInstance singletonInstance = null;
- for (ServerInstance serverInstance : receivingStageInstances) {
+ VirtualServer singletonInstance = null;
+ for (VirtualServer serverInstance : receivingStageInstances) {
if (serverInstance.getHostname().equals(mailboxService.getHostname())
&& serverInstance.getQueryMailboxPort() ==
mailboxService.getMailboxPort()) {
Preconditions.checkState(singletonInstance == null, "multiple
instance found for singleton exchange type!");
@@ -161,10 +161,10 @@ public class MailboxSendOperator extends
MultiStageOperator {
}
private static JsonMailboxIdentifier toMailboxId(
- ServerInstance destination, long jobId, int stageId, String sender, int
senderPort) {
+ VirtualServer destination, long jobId, int stageId, VirtualServerAddress
sender) {
return new JsonMailboxIdentifier(
String.format("%s_%s", jobId, stageId),
- new ServerAddress(sender, senderPort),
- new ServerAddress(destination.getHostname(),
destination.getQueryMailboxPort()));
+ sender,
+ new VirtualServerAddress(destination));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index 828a969dd0..c508d746ca 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.plan;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
/**
@@ -33,7 +33,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
*/
public class DistributedStagePlan {
private int _stageId;
- private ServerInstance _serverInstance;
+ private VirtualServer _server;
private StageNode _stageRoot;
private Map<Integer, StageMetadata> _metadataMap;
@@ -42,10 +42,10 @@ public class DistributedStagePlan {
_metadataMap = new HashMap<>();
}
- public DistributedStagePlan(int stageId, ServerInstance serverInstance,
StageNode stageRoot,
+ public DistributedStagePlan(int stageId, VirtualServer server, StageNode
stageRoot,
Map<Integer, StageMetadata> metadataMap) {
_stageId = stageId;
- _serverInstance = serverInstance;
+ _server = server;
_stageRoot = stageRoot;
_metadataMap = metadataMap;
}
@@ -54,8 +54,8 @@ public class DistributedStagePlan {
return _stageId;
}
- public ServerInstance getServerInstance() {
- return _serverInstance;
+ public VirtualServer getServer() {
+ return _server;
}
public StageNode getStageRoot() {
@@ -66,8 +66,8 @@ public class DistributedStagePlan {
return _metadataMap;
}
- public void setServerInstance(ServerInstance serverInstance) {
- _serverInstance = serverInstance;
+ public void setServer(VirtualServer serverInstance) {
+ _server = serverInstance;
}
public void setStageRoot(StageNode stageRoot) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index d675efea62..e4d54a57a8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime.plan;
import java.util.List;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
@@ -32,6 +31,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.StageNodeVisitor;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
@@ -62,10 +62,10 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node,
PlanRequestContext context) {
- List<ServerInstance> sendingInstances =
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
+ List<VirtualServer> sendingInstances =
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
MailboxReceiveOperator mailboxReceiveOperator =
new MailboxReceiveOperator(context.getMailboxService(),
sendingInstances,
- node.getExchangeType(), context.getHostName(), context.getPort(),
+ node.getExchangeType(), context.getServer(),
context.getRequestId(), node.getSenderStageId(),
context.getTimeoutMs());
context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
return mailboxReceiveOperator;
@@ -77,7 +77,7 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
StageMetadata receivingStageMetadata =
context.getMetadataMap().get(node.getReceiverStageId());
return new MailboxSendOperator(context.getMailboxService(), nextOperator,
receivingStageMetadata.getServerInstances(), node.getExchangeType(),
node.getPartitionKeySelector(),
- context.getHostName(), context.getPort(), context.getRequestId(),
node.getStageId());
+ context.getServer(), context.getRequestId(), node.getStageId());
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index a52e0fa0be..db5b0e028a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -33,20 +34,18 @@ public class PlanRequestContext {
protected final long _requestId;
protected final int _stageId;
private final long _timeoutMs;
- protected final String _hostName;
- protected final int _port;
+ protected final VirtualServerAddress _server;
protected final Map<Integer, StageMetadata> _metadataMap;
protected final List<MailboxIdentifier> _receivingMailboxes = new
ArrayList<>();
public PlanRequestContext(MailboxService<TransferableBlock> mailboxService,
long requestId, int stageId,
- long timeoutMs, String hostName, int port, Map<Integer, StageMetadata>
metadataMap) {
+ long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata>
metadataMap) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_timeoutMs = timeoutMs;
- _hostName = hostName;
- _port = port;
+ _server = server;
_metadataMap = metadataMap;
}
@@ -62,12 +61,8 @@ public class PlanRequestContext {
return _timeoutMs;
}
- public String getHostName() {
- return _hostName;
- }
-
- public int getPort() {
- return _port;
+ public VirtualServerAddress getServer() {
+ return _server;
}
public Map<Integer, StageMetadata> getMetadataMap() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 071a9325bd..85440e1aed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -48,6 +48,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.StageNodeVisitor;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.service.QueryConfig;
@@ -104,8 +105,8 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
pinotQuery.setExplain(false);
ServerPlanRequestContext context =
new ServerPlanRequestContext(mailboxService, requestId,
stagePlan.getStageId(), timeoutMs,
- stagePlan.getServerInstance().getHostname(),
stagePlan.getServerInstance().getPort(),
- stagePlan.getMetadataMap(), pinotQuery, tableType,
timeBoundaryInfo);
+ new VirtualServerAddress(stagePlan.getServer()),
stagePlan.getMetadataMap(), pinotQuery, tableType,
+ timeBoundaryInfo);
// visit the plan and create query physical plan.
ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 776e9f4e22..918d459312 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AbstractStageNode;
import org.apache.pinot.query.planner.stage.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -43,7 +44,7 @@ public class QueryPlanSerDeUtils {
public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
DistributedStagePlan distributedStagePlan = new
DistributedStagePlan(stagePlan.getStageId());
-
distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId()));
+
distributedStagePlan.setServer(stringToInstance(stagePlan.getInstanceId()));
distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
Map<Integer, Worker.StageMetadata> metadataMap =
stagePlan.getStageMetadataMap();
distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
@@ -53,19 +54,19 @@ public class QueryPlanSerDeUtils {
public static Worker.StagePlan serialize(DistributedStagePlan
distributedStagePlan) {
return Worker.StagePlan.newBuilder()
.setStageId(distributedStagePlan.getStageId())
-
.setInstanceId(instanceToString(distributedStagePlan.getServerInstance()))
+ .setInstanceId(instanceToString(distributedStagePlan.getServer()))
.setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode)
distributedStagePlan.getStageRoot()))
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
}
- public static ServerInstance stringToInstance(String serverInstanceString) {
+ public static VirtualServer stringToInstance(String serverInstanceString) {
String[] s = StringUtils.split(serverInstanceString, '_');
// Skipped netty and grpc port as they are not used in worker instance.
- return new WorkerInstance(s[0], Integer.parseInt(s[1]),
Integer.parseInt(s[2]), Integer.parseInt(s[3]),
- Integer.parseInt(s[4]));
+ return new VirtualServer(new WorkerInstance(s[0], Integer.parseInt(s[1]),
Integer.parseInt(s[2]),
+ Integer.parseInt(s[3]), Integer.parseInt(s[4])), 0);
}
- public static String instanceToString(ServerInstance serverInstance) {
+ public static String instanceToString(VirtualServer serverInstance) {
return StringUtils.join(serverInstance.getHostname(), '_',
serverInstance.getPort(), '_',
serverInstance.getGrpcPort(), '_',
serverInstance.getQueryServicePort(), '_',
serverInstance.getQueryMailboxPort());
@@ -94,7 +95,8 @@ public class QueryPlanSerDeUtils {
:
instanceEntry.getValue().getTableTypeToSegmentListMap().entrySet()) {
tableToSegmentMap.put(tableEntry.getKey(),
tableEntry.getValue().getSegmentsList());
}
-
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(instanceEntry.getKey()),
tableToSegmentMap);
+ stageMetadata.getServerInstanceToSegmentsMap()
+ .put(stringToInstance(instanceEntry.getKey()).getServer(),
tableToSegmentMap);
}
// time boundary info
if (!workerStageMetadata.getTimeColumn().isEmpty()) {
@@ -117,7 +119,7 @@ public class QueryPlanSerDeUtils {
// scanned table
builder.addAllDataSources(stageMetadata.getScannedTables());
// server instance to table-segments mapping
- for (ServerInstance serverInstance : stageMetadata.getServerInstances()) {
+ for (VirtualServer serverInstance : stageMetadata.getServerInstances()) {
builder.addInstances(instanceToString(serverInstance));
}
for (Map.Entry<ServerInstance, Map<String, List<String>>> instanceEntry
@@ -127,7 +129,7 @@ public class QueryPlanSerDeUtils {
tableToSegmentMap.put(tableEntry.getKey(),
Worker.SegmentList.newBuilder().addAllSegments(tableEntry.getValue()).build());
}
- builder.putInstanceToSegmentMap(instanceToString(instanceEntry.getKey()),
+ builder.putInstanceToSegmentMap(instanceToString(new
VirtualServer(instanceEntry.getKey(), 0)),
Worker.SegmentMap.newBuilder().putAllTableTypeToSegmentList(tableToSegmentMap).build());
}
// time boundary info
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 3ce5bd3551..35142fe0cf 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,6 +24,7 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.spi.config.table.TableType;
@@ -41,9 +42,9 @@ public class ServerPlanRequestContext extends
PlanRequestContext {
protected InstanceRequest _instanceRequest;
public ServerPlanRequestContext(MailboxService<TransferableBlock>
mailboxService, long requestId, int stageId,
- long timeoutMs, String hostName, int port, Map<Integer, StageMetadata>
metadataMap, PinotQuery pinotQuery,
+ long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata>
metadataMap, PinotQuery pinotQuery,
TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
- super(mailboxService, requestId, stageId, timeoutMs, hostName, port,
metadataMap);
+ super(mailboxService, requestId, stageId, timeoutMs, server, metadataMap);
_pinotQuery = pinotQuery;
_tableType = tableType;
_timeBoundaryInfo = timeBoundaryInfo;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index f0eb564327..1000f42869 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -35,11 +35,12 @@ import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
@@ -70,8 +71,8 @@ public class QueryDispatcher {
MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(reduceStageId);
MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
requestId,
- reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
mailboxService.getHostname(),
- mailboxService.getMailboxPort(), timeoutMs);
+ reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
+ new VirtualServerAddress(mailboxService.getHostname(),
mailboxService.getMailboxPort(), 0), timeoutMs);
List<DataBlock> resultDataBlocks =
reduceMailboxReceive(mailboxReceiveOperator, timeoutMs);
mailboxReceiveOperator.toExplainString();
long toResultTableStartTime = System.currentTimeMillis();
@@ -92,8 +93,8 @@ public class QueryDispatcher {
if (queryPlan.getQueryStageMap().get(stageId) instanceof
MailboxReceiveNode) {
reduceStageId = stageId;
} else {
- List<ServerInstance> serverInstances =
stage.getValue().getServerInstances();
- for (ServerInstance serverInstance : serverInstances) {
+ List<VirtualServer> serverInstances =
stage.getValue().getServerInstances();
+ for (VirtualServer serverInstance : serverInstances) {
String host = serverInstance.getHostname();
int servicePort = serverInstance.getQueryServicePort();
DispatchClient client = getOrCreateDispatchClient(host, servicePort);
@@ -120,7 +121,7 @@ public class QueryDispatcher {
}
public static DistributedStagePlan constructDistributedStagePlan(QueryPlan
queryPlan, int stageId,
- ServerInstance serverInstance) {
+ VirtualServer serverInstance) {
return new DistributedStagePlan(stageId, serverInstance,
queryPlan.getQueryStageMap().get(stageId),
queryPlan.getStageMetadataMap());
}
@@ -195,12 +196,12 @@ public class QueryDispatcher {
@VisibleForTesting
public static MailboxReceiveOperator
createReduceStageOperator(MailboxService<TransferableBlock> mailboxService,
- List<ServerInstance> sendingInstances, long jobId, int stageId,
DataSchema dataSchema, String hostname,
- int port, long timeoutMs) {
+ List<VirtualServer> sendingInstances, long jobId, int stageId,
DataSchema dataSchema, VirtualServerAddress server,
+ long timeoutMs) {
// timeout is set for reduce stage
MailboxReceiveOperator mailboxReceiveOperator =
new MailboxReceiveOperator(mailboxService, sendingInstances,
- RelDistribution.Type.RANDOM_DISTRIBUTED, hostname, port, jobId,
stageId, timeoutMs);
+ RelDistribution.Type.RANDOM_DISTRIBUTED, server, jobId, stageId,
timeoutMs);
return mailboxReceiveOperator;
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 70b7763ca9..4e04d5c899 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.testutils.QueryTestUtils;
@@ -76,8 +77,8 @@ public class GrpcMailboxServiceTest {
// Given:
JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
"happypath",
- new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
- new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
+ new VirtualServerAddress("localhost",
_mailboxService1.getMailboxPort(), 0),
+ new VirtualServerAddress("localhost",
_mailboxService2.getMailboxPort(), 0));
SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
@@ -108,8 +109,8 @@ public class GrpcMailboxServiceTest {
// Given:
JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
"exception",
- new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
- new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
+ new VirtualServerAddress("localhost",
_mailboxService1.getMailboxPort(), 0),
+ new VirtualServerAddress("localhost",
_mailboxService2.getMailboxPort(), 0));
SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index a1eae8e1ff..0b7438b330 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -39,7 +40,7 @@ public class InMemoryMailboxServiceTest {
throws Exception {
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
- "happyPathJob", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 0));
+ "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 0, 0));
InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox)
mailboxService.getReceivingMailbox(
mailboxId);
InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox)
mailboxService.getSendingMailbox(mailboxId);
@@ -75,7 +76,7 @@ public class InMemoryMailboxServiceTest {
public void testNonLocalMailboxId() {
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
- "happyPathJob", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 1));
+ "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 1, 0));
// Test getReceivingMailbox
try {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 110aeb9d74..66ed5be7d9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -25,9 +26,9 @@ import org.testng.annotations.Test;
public class MultiplexingMailboxServiceTest {
private static final JsonMailboxIdentifier LOCAL_MAILBOX_ID = new
JsonMailboxIdentifier(
- "localJobId", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 0));
+ "localJobId", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 0, 0));
private static final JsonMailboxIdentifier NON_LOCAL_MAILBOX_ID = new
JsonMailboxIdentifier(
- "localJobId", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 1));
+ "localJobId", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 1, 0));
@Test
public void testHappyPath() {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index abb7d65e4d..093fffa381 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -28,12 +28,13 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -189,13 +190,14 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)),
- reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
"localhost", _reducerGrpcPort,
+ reduceNode.getSenderStageId(), reduceNode.getDataSchema(),
+ new VirtualServerAddress("localhost", _reducerGrpcPort, 0),
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)));
} else {
- for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+ for (VirtualServer serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
DistributedStagePlan distributedStagePlan =
QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
- _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan,
requestMetadataMap);
}
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 090f4ded6f..b921c8fdd8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -47,6 +47,8 @@ import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.service.QueryConfig;
@@ -92,13 +94,13 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)),
reduceNode.getSenderStageId(),
- reduceNode.getDataSchema(), "localhost", _reducerGrpcPort,
+ reduceNode.getDataSchema(), new VirtualServerAddress("localhost",
_reducerGrpcPort, 0),
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)));
} else {
- for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+ for (VirtualServer serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
DistributedStagePlan distributedStagePlan =
QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
- _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan,
requestMetadataMap);
}
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 0435b50daa..36bee05aae 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
public class RoundRobinSchedulerTest {
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1_1", "foo:2", "bar:3");
- private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1_2", "foo:2", "bar:3");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1_1", "0@foo:2", "0@bar:3");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1_2", "0@foo:2", "0@bar:3");
@Mock
private MultiStageOperator _operator;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index ce6777df14..b1c6b5cc79 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -25,11 +25,11 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.ServerAddress;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -56,9 +56,11 @@ public class MailboxReceiveOperatorTest {
@Mock
private MailboxService<TransferableBlock> _mailboxService;
@Mock
- private ServerInstance _server1;
+ private VirtualServer _server1;
@Mock
- private ServerInstance _server2;
+ private VirtualServer _server2;
+
+ private final VirtualServerAddress _testAddr = new
VirtualServerAddress("test", 123, 0);
@BeforeMethod
public void setUp() {
@@ -76,7 +78,7 @@ public class MailboxReceiveOperatorTest {
throws InterruptedException {
// shorter timeoutMs should result in error.
MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, "test", 123, 456,
+ new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, _testAddr, 456,
789, 10L);
Thread.sleep(200L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -86,13 +88,13 @@ public class MailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
receiveOp =
- new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, "test", 123, 456,
+ new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, _testAddr, 456,
789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
receiveOp =
- new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, "test", 123, 456,
+ new MailboxReceiveOperator(_mailboxService, new ArrayList<>(),
RelDistribution.Type.SINGLETON, _testAddr, 456,
789, null);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
@@ -113,7 +115,7 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+ RelDistribution.Type.SINGLETON, _testAddr, 456, 789, null);
}
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -128,7 +130,7 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+ RelDistribution.Type.RANGE_DISTRIBUTED, _testAddr, 456, 789, null);
}
@Test
@@ -150,9 +152,10 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
// Receive end of stream block directly when there is no match.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -177,15 +180,16 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(serverHost, server2port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(serverHost, server2port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
// Receive end of stream block directly when mailbox is close.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -210,17 +214,18 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(serverHost, server2port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(serverHost, server2port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
// Receive null mailbox during timeout.
Mockito.when(_mailbox.receive()).thenReturn(null);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
// Receive NoOpBlock.
Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -245,16 +250,17 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(serverHost, server2port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(serverHost, server2port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
// Receive EosBloc.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -279,18 +285,19 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(serverHost, server2port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(serverHost, server2port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
@@ -317,17 +324,18 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(serverHost, server2port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(serverHost, server2port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Exception e = new Exception("errorBlock");
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+ RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -351,25 +359,26 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId1 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server1Host, server1Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server1Host, server1Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
JsonMailboxIdentifier expectedMailboxId2 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server2Host, server2Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server2Host, server2Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId,
null);
+ RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId,
null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
@@ -393,26 +402,27 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
JsonMailboxIdentifier expectedMailboxId1 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server1Host, server1Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server1Host, server1Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(null);
JsonMailboxIdentifier expectedMailboxId2 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server2Host, server2Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server2Host, server2Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId,
null);
+ RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId,
null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
@@ -436,12 +446,13 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
JsonMailboxIdentifier expectedMailboxId1 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server1Host, server1Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server1Host, server1Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Object[] expRow1 = new Object[]{1, 1};
@@ -453,13 +464,13 @@ public class MailboxReceiveOperatorTest {
Object[] expRow3 = new Object[]{3, 3};
JsonMailboxIdentifier expectedMailboxId2 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server2Host, server2Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server2Host, server2Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId,
null);
+ RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId,
null);
// Receive first block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
@@ -495,12 +506,13 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
JsonMailboxIdentifier expectedMailboxId1 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server1Host, server1Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server1Host, server1Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive())
@@ -509,13 +521,13 @@ public class MailboxReceiveOperatorTest {
Object[] expRow3 = new Object[]{3, 3};
JsonMailboxIdentifier expectedMailboxId2 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server2Host, server2Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server2Host, server2Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId,
null);
+ RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId,
null);
// Receive error block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -540,12 +552,13 @@ public class MailboxReceiveOperatorTest {
int stageId = 0;
int toPort = 8888;
String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort,
0);
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
JsonMailboxIdentifier expectedMailboxId1 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server1Host, server1Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server1Host, server1Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenThrow(new Exception("mailboxError"));
@@ -553,13 +566,13 @@ public class MailboxReceiveOperatorTest {
Object[] expRow3 = new Object[]{3, 3};
JsonMailboxIdentifier expectedMailboxId2 =
new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new ServerAddress(server2Host, server2Port),
- new ServerAddress(toHost, toPort));
+ new VirtualServerAddress(server2Host, server2Port, 0),
+ toAddress);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId,
null);
+ RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId,
null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have
returned an error-block");
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index b616efcf91..4bc112d933 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -48,7 +48,7 @@ public class MailboxSendOperatorTest {
@Mock
private MailboxService<TransferableBlock> _mailboxService;
@Mock
- private ServerInstance _server;
+ private VirtualServer _server;
@Mock
private KeySelector<Object[], Object[]> _selector;
@Mock
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"),
_exchangeFactory, 1, 2);
TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
Mockito.when(_input.nextBlock())
.thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenThrow(new RuntimeException("foo!"));
ArgumentCaptor<TransferableBlock> captor =
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"),
_exchangeFactory, 1, 2);
TransferableBlock eosBlock =
TransferableBlockUtils.getEndOfStreamTransferableBlock();
Mockito.when(_input.nextBlock())
.thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2"),
_exchangeFactory, 1, 2);
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
Mockito.when(_input.nextBlock())
.thenReturn(dataBlock)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index d2cb71a04c..43663c5e0a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -44,9 +44,8 @@ import org.testng.annotations.Test;
public class BlockExchangeTest {
-
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index c99ea643c6..eccd31fe6a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
public class BroadcastExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 4cd1d71075..31ff4a70f3 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -37,9 +37,8 @@ import org.testng.annotations.Test;
public class HashExchangeTest {
-
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index 19e3db1711..c93080824c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -33,8 +33,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class RandomExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index e38e533a9a..6b9dcd53f1 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -34,7 +34,7 @@ import org.testng.annotations.Test;
public class SingletonExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index f7c7f06f0f..5473b4703d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -166,7 +167,7 @@ public class QueryServerTest extends QueryTestSet {
}
private Worker.QueryRequest getQueryRequest(QueryPlan queryPlan, int
stageId) {
- ServerInstance serverInstance =
queryPlan.getStageMetadataMap().get(stageId).getServerInstances().get(0);
+ VirtualServer serverInstance =
queryPlan.getStageMetadataMap().get(stageId).getServerInstances().get(0);
return
Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]