This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a8a3db5ef1 [multistage] make mailboxID JSON serialized (#10154)
a8a3db5ef1 is described below
commit a8a3db5ef1ce8388349a07ff589ea4e8c093f7c8
Author: Almog Gavra <[email protected]>
AuthorDate: Fri Jan 20 09:35:36 2023 -0800
[multistage] make mailboxID JSON serialized (#10154)
---
.../query/mailbox/InMemorySendingMailbox.java | 2 +-
.../pinot/query/mailbox/JsonMailboxIdentifier.java | 132 +++++++++++++++++++++
.../pinot/query/mailbox/MailboxIdentifier.java | 22 +---
.../apache/pinot/query/mailbox/ServerAddress.java | 82 +++++++++++++
.../query/mailbox/StringMailboxIdentifier.java | 105 ----------------
.../java/org/apache/pinot/query/mailbox/Utils.java | 9 +-
.../channel/MailboxContentStreamObserver.java | 6 +-
.../runtime/operator/MailboxReceiveOperator.java | 9 +-
.../runtime/operator/MailboxSendOperator.java | 13 +-
.../query/mailbox/GrpcMailboxServiceTest.java | 12 +-
.../query/mailbox/InMemoryMailboxServiceTest.java | 8 +-
.../mailbox/MultiplexingMailboxServiceTest.java | 8 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 6 +-
.../operator/MailboxReceiveOperatorTest.java | 93 ++++++++++-----
.../runtime/operator/MailboxSendOperatorTest.java | 12 +-
.../operator/exchange/BlockExchangeTest.java | 6 +-
.../operator/exchange/BroadcastExchangeTest.java | 6 +-
.../operator/exchange/HashExchangeTest.java | 6 +-
.../operator/exchange/RandomExchangeTest.java | 6 +-
.../operator/exchange/SingletonExchangeTest.java | 4 +-
20 files changed, 340 insertions(+), 207 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 9c2a713c36..ebde5b9c41 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -49,7 +49,7 @@ public class InMemorySendingMailbox implements
SendingMailbox<TransferableBlock>
throw new IllegalStateException("Failed to insert into in-memory mailbox
"
+ _mailboxId);
}
- _gotMailCallback.accept(new StringMailboxIdentifier(_mailboxId));
+ _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
new file mode 100644
index 0000000000..7068b06ab6
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/JsonMailboxIdentifier.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Objects;
+
+
+public class JsonMailboxIdentifier implements MailboxIdentifier {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final String _jobId;
+ private final String _from;
+ private final String _to;
+
+ private final ServerAddress _fromAddress;
+ private final ServerAddress _toAddress;
+
+ @JsonCreator
+ public JsonMailboxIdentifier(
+ @JsonProperty(value = "jobId") String jobId,
+ @JsonProperty(value = "from") String from,
+ @JsonProperty(value = "to") String to
+ ) {
+ _jobId = jobId;
+ _from = from;
+ _to = to;
+ _fromAddress = ServerAddress.parse(_from);
+ _toAddress = ServerAddress.parse(_to);
+ }
+
+ public JsonMailboxIdentifier(
+ String jobId,
+ ServerAddress from,
+ ServerAddress to
+ ) {
+ _jobId = jobId;
+ _from = from.toString();
+ _to = to.toString();
+ _fromAddress = from;
+ _toAddress = to;
+ }
+
+ public static JsonMailboxIdentifier parse(final String mailboxId) {
+ try {
+ return MAPPER.readValue(mailboxId, JsonMailboxIdentifier.class);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Invalid mailboxID: '" + mailboxId +
"'. If you see this exception it may "
+ + "be because you are doing a rolling upgrade from an old version of
Pinot that is not backwards "
+ + "compatible with the current V2 engine.", e);
+ }
+ }
+
+ @Override
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public String getFrom() {
+ return _from;
+ }
+
+ @JsonIgnore
+ @Override
+ public ServerAddress getFromHost() {
+ return _fromAddress;
+ }
+
+ public String getTo() {
+ return _to;
+ }
+
+ @JsonIgnore
+ @Override
+ public ServerAddress getToHost() {
+ return _toAddress;
+ }
+
+ @JsonIgnore
+ @Override
+ public boolean isLocal() {
+ return _fromAddress.equals(_toAddress);
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JsonMailboxIdentifier that = (JsonMailboxIdentifier) o;
+ return Objects.equals(_jobId, that._jobId) && Objects.equals(_from,
that._from) && Objects.equals(_to, that._to)
+ && Objects.equals(_fromAddress, that._fromAddress) &&
Objects.equals(_toAddress, that._toAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_jobId, _from, _to, _fromAddress, _toAddress);
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index 8646f508ae..77647d3db2 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -32,28 +32,14 @@ public interface MailboxIdentifier {
String getJobId();
/**
- * get the sender host.
- * @return sender host
+ * @return the sender address
*/
- String getFromHost();
+ ServerAddress getFromHost();
/**
- * get the sender port.
- * @return sender port
+ * @return the destination address
*/
- int getFromPort();
-
- /**
- * get the receiver host.
- * @return receiver host
- */
- String getToHost();
-
- /**
- * get the receiver port.
- * @return receiver port
- */
- int getToPort();
+ ServerAddress getToHost();
/**
* Checks whether sender and receiver are in the same JVM.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
new file mode 100644
index 0000000000..cda2b332fe
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ServerAddress.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.query.mailbox;
+
+import java.util.Objects;
+
+
+public class ServerAddress {
+
+ private final String _hostname;
+ private final int _port;
+
+ public ServerAddress(String hostname, int port) {
+ _hostname = hostname;
+ _port = port;
+ }
+
+ /**
+ * Parses the standard hostname:port pattern into
+ * a {@code ServerAddress}
+ *
+ * @param address the serialized string
+ * @return the deserialized form
+ */
+ public static ServerAddress parse(String address) {
+ String[] split = address.split(":");
+ return new ServerAddress(split[0], Integer.parseInt(split[1]));
+ }
+
+ /**
+ * @return the server's hostname
+ */
+ public String hostname() {
+ return _hostname;
+ }
+
+ /**
+ * @return the server's port
+ */
+ public int port() {
+ return _port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServerAddress that = (ServerAddress) o;
+ return _port == that._port && Objects.equals(_hostname, that._hostname);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_hostname, _port);
+ }
+
+ @Override
+ public String toString() {
+ return _hostname + ":" + _port;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
deleted file mode 100644
index c3d3078780..0000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.mailbox;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-
-public class StringMailboxIdentifier implements MailboxIdentifier {
- private static final Joiner JOINER = Joiner.on(':');
-
- private final String _mailboxIdString;
- private final String _jobId;
- private final String _fromHost;
- private final int _fromPort;
- private final String _toHost;
- private final int _toPort;
-
- public StringMailboxIdentifier(String jobId, String fromHost, int fromPort,
String toHost,
- int toPort) {
- _jobId = jobId;
- _fromHost = fromHost;
- _fromPort = fromPort;
- _toHost = toHost;
- _toPort = toPort;
- _mailboxIdString = JOINER.join(_jobId, _fromHost, _fromPort, _toHost,
_toPort);
- }
-
- public StringMailboxIdentifier(String mailboxId) {
- _mailboxIdString = mailboxId;
- String[] splits = mailboxId.split(":");
- Preconditions.checkState(splits.length == 5);
- _jobId = splits[0];
- _fromHost = splits[1];
- _fromPort = Integer.parseInt(splits[2]);
- _toHost = splits[3];
- _toPort = Integer.parseInt(splits[4]);
-
- // assert that resulting string are identical.
- Preconditions.checkState(
- JOINER.join(_jobId, _fromHost, _fromPort, _toHost,
_toPort).equals(_mailboxIdString));
- }
-
- @Override
- public String getJobId() {
- return _jobId;
- }
-
- @Override
- public String getFromHost() {
- return _fromHost;
- }
-
- @Override
- public int getFromPort() {
- return _fromPort;
- }
-
- @Override
- public String getToHost() {
- return _toHost;
- }
-
- @Override
- public int getToPort() {
- return _toPort;
- }
-
- @Override
- public boolean isLocal() {
- return _fromHost.equals(_toHost) && _fromPort == _toPort;
- }
-
- @Override
- public String toString() {
- return _mailboxIdString;
- }
-
- @Override
- public int hashCode() {
- return _mailboxIdString.hashCode();
- }
-
- @Override
- public boolean equals(Object that) {
- return (that instanceof StringMailboxIdentifier) &&
_mailboxIdString.equals(
- ((StringMailboxIdentifier) that)._mailboxIdString);
- }
-}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
index 90a4070544..60ae614a2f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/Utils.java
@@ -18,11 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
-import com.google.common.base.Joiner;
-
-
public final class Utils {
- private static final Joiner JOINER = Joiner.on(':');
private Utils() {
// do not instantiate.
@@ -30,11 +26,12 @@ public final class Utils {
public static String constructChannelId(String mailboxId) {
MailboxIdentifier mailboxIdentifier = toMailboxIdentifier(mailboxId);
- return JOINER.join(mailboxIdentifier.getToHost(),
mailboxIdentifier.getToPort());
+ ServerAddress dest = mailboxIdentifier.getToHost();
+ return dest.toString();
}
public static MailboxIdentifier toMailboxIdentifier(String mailboxId) {
- return new StringMailboxIdentifier(mailboxId);
+ return JsonMailboxIdentifier.parse(mailboxId);
}
public static String fromMailboxIdentifier(MailboxIdentifier mailboxId) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 9fd3a4b2ba..4a35c8d660 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -31,8 +31,8 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +71,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
@GuardedBy("_errorLock")
private Mailbox.MailboxContent _errorContent = null;
- private StringMailboxIdentifier _mailboxId;
+ private JsonMailboxIdentifier _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
private void updateMaxBufferSize() {
@@ -132,7 +132,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
@Override
public void onNext(Mailbox.MailboxContent mailboxContent) {
- _mailboxId = new StringMailboxIdentifier(mailboxContent.getMailboxId());
+ _mailboxId = JsonMailboxIdentifier.parse(mailboxContent.getMailboxId());
GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox)
_mailboxService.getReceivingMailbox(_mailboxId);
_gotMailCallback = receivingMailbox.init(this);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index aa569ba56f..8fb1d145c1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -30,10 +30,11 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.service.QueryConfig;
@@ -69,8 +70,10 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
private static MailboxIdentifier toMailboxId(ServerInstance fromInstance,
long jobId, long stageId,
String receiveHostName, int receivePort) {
- return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
fromInstance.getHostname(),
- fromInstance.getQueryMailboxPort(), receiveHostName, receivePort);
+ return new JsonMailboxIdentifier(
+ String.format("%s_%s", jobId, stageId),
+ new ServerAddress(fromInstance.getHostname(),
fromInstance.getQueryMailboxPort()),
+ new ServerAddress(receiveHostName, receivePort));
}
// TODO: Move deadlineInNanoSeconds to OperatorContext.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 79d64bfefe..047f5fd5b2 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -29,9 +29,10 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -159,9 +160,11 @@ public class MailboxSendOperator extends
MultiStageOperator {
return transferableBlock;
}
- private static StringMailboxIdentifier toMailboxId(
- ServerInstance serverInstance, long jobId, int stageId, String
serverHostName, int serverPort) {
- return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHostName, serverPort,
- serverInstance.getHostname(), serverInstance.getQueryMailboxPort());
+ private static JsonMailboxIdentifier toMailboxId(
+ ServerInstance destination, long jobId, int stageId, String sender, int
senderPort) {
+ return new JsonMailboxIdentifier(
+ String.format("%s_%s", jobId, stageId),
+ new ServerAddress(sender, senderPort),
+ new ServerAddress(destination.getHostname(),
destination.getQueryMailboxPort()));
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 7585c0ca87..70b7763ca9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -74,8 +74,10 @@ public class GrpcMailboxServiceTest {
public void testHappyPath()
throws Exception {
// Given:
- StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
- "happypath", "localhost", _mailboxService1.getMailboxPort(),
"localhost", _mailboxService2.getMailboxPort());
+ JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+ "happypath",
+ new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
+ new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
@@ -104,8 +106,10 @@ public class GrpcMailboxServiceTest {
public void testGrpcException()
throws Exception {
// Given:
- StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
- "exception", "localhost", _mailboxService1.getMailboxPort(),
"localhost", _mailboxService2.getMailboxPort());
+ JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+ "exception",
+ new ServerAddress("localhost", _mailboxService1.getMailboxPort()),
+ new ServerAddress("localhost", _mailboxService2.getMailboxPort()));
SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 7ffb51238f..a1eae8e1ff 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -38,8 +38,8 @@ public class InMemoryMailboxServiceTest {
public void testHappyPath()
throws Exception {
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
- final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
- "happyPathJob", "localhost", 0, "localhost", 0);
+ final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+ "happyPathJob", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 0));
InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox)
mailboxService.getReceivingMailbox(
mailboxId);
InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox)
mailboxService.getSendingMailbox(mailboxId);
@@ -74,8 +74,8 @@ public class InMemoryMailboxServiceTest {
@Test
public void testNonLocalMailboxId() {
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
- final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
- "happyPathJob", "localhost", 0, "localhost", 1);
+ final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+ "happyPathJob", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 1));
// Test getReceivingMailbox
try {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 347fcd0f59..110aeb9d74 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -24,10 +24,10 @@ import org.testng.annotations.Test;
public class MultiplexingMailboxServiceTest {
- private static final StringMailboxIdentifier LOCAL_MAILBOX_ID = new
StringMailboxIdentifier(
- "localJobId", "localhost", 0, "localhost", 0);
- private static final StringMailboxIdentifier NON_LOCAL_MAILBOX_ID = new
StringMailboxIdentifier(
- "localJobId", "localhost", 0, "localhost", 1);
+ private static final JsonMailboxIdentifier LOCAL_MAILBOX_ID = new
JsonMailboxIdentifier(
+ "localJobId", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 0));
+ private static final JsonMailboxIdentifier NON_LOCAL_MAILBOX_ID = new
JsonMailboxIdentifier(
+ "localJobId", new ServerAddress("localhost", 0), new
ServerAddress("localhost", 1));
@Test
public void testHappyPath() {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index bb3faf63ec..0435b50daa 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -20,8 +20,8 @@ package org.apache.pinot.query.runtime.executor;
import com.google.common.collect.ImmutableList;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.mockito.Mock;
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
public class RoundRobinSchedulerTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1_1:foo:2:bar:3");
- private static final MailboxIdentifier MAILBOX_2 = new
StringMailboxIdentifier("1_2:foo:2:bar:3");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1_1", "foo:2", "bar:3");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1_2", "foo:2", "bar:3");
@Mock
private MultiStageOperator _operator;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 5c3008ae42..ce6777df14 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -26,9 +26,10 @@ import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.mailbox.ServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -177,8 +178,10 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHost, server2port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(serverHost, server2port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
@@ -208,8 +211,10 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHost, server2port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(serverHost, server2port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
// Receive null mailbox during timeout.
@@ -241,8 +246,10 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHost, server2port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(serverHost, server2port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -273,8 +280,10 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHost, server2port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(serverHost, server2port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
@@ -309,8 +318,10 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
serverHost, server2port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(serverHost, server2port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Exception e = new Exception("errorBlock");
@@ -341,13 +352,17 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId1 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server1Host, server1Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId1 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server1Host, server1Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
- StringMailboxIdentifier expectedMailboxId2 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server2Host, server2Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId2 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server2Host, server2Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
@@ -379,14 +394,18 @@ public class MailboxReceiveOperatorTest {
int toPort = 8888;
String toHost = "toHost";
- StringMailboxIdentifier expectedMailboxId1 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server1Host, server1Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId1 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server1Host, server1Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(null);
- StringMailboxIdentifier expectedMailboxId2 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server2Host, server2Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId2 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server2Host, server2Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
@@ -419,8 +438,10 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
- StringMailboxIdentifier expectedMailboxId1 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server1Host, server1Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId1 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server1Host, server1Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Object[] expRow1 = new Object[]{1, 1};
@@ -430,8 +451,10 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
Object[] expRow3 = new Object[]{3, 3};
- StringMailboxIdentifier expectedMailboxId2 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server2Host, server2Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId2 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server2Host, server2Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
@@ -474,16 +497,20 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
- StringMailboxIdentifier expectedMailboxId1 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server1Host, server1Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId1 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server1Host, server1Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("mailboxError")));
Object[] expRow3 = new Object[]{3, 3};
- StringMailboxIdentifier expectedMailboxId2 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server2Host, server2Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId2 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server2Host, server2Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
@@ -515,15 +542,19 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new
DataSchema.ColumnDataType[]{INT, INT});
- StringMailboxIdentifier expectedMailboxId1 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server1Host, server1Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId1 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server1Host, server1Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenThrow(new Exception("mailboxError"));
Object[] expRow3 = new Object[]{3, 3};
- StringMailboxIdentifier expectedMailboxId2 =
- new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId),
server2Host, server2Port, toHost, toPort);
+ JsonMailboxIdentifier expectedMailboxId2 =
+ new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new ServerAddress(server2Host, server2Port),
+ new ServerAddress(toHost, toPort));
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema,
expRow3));
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 9f6577dc66..b616efcf91 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -24,8 +24,8 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
Mockito.when(_input.nextBlock())
.thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
Mockito.when(_input.nextBlock())
.thenThrow(new RuntimeException("foo!"));
ArgumentCaptor<TransferableBlock> captor =
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
TransferableBlock eosBlock =
TransferableBlockUtils.getEndOfStreamTransferableBlock();
Mockito.when(_input.nextBlock())
.thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
// Given:
MailboxSendOperator operator = new MailboxSendOperator(
_mailboxService, _input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
- server -> new StringMailboxIdentifier("123:from:1:to:2"),
_exchangeFactory, 1, 2);
+ server -> new JsonMailboxIdentifier("123", "from:1", "to:2"),
_exchangeFactory, 1, 2);
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
Mockito.when(_input.nextBlock())
.thenReturn(dataBlock)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 617718a566..d2cb71a04c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -26,10 +26,10 @@ import java.util.function.BiFunction;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -45,8 +45,8 @@ import org.testng.annotations.Test;
public class BlockExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1:host:1:host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
StringMailboxIdentifier("1:host:1:host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index 14b673dcd8..c99ea643c6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -34,8 +34,8 @@ import org.testng.annotations.Test;
public class BroadcastExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1:host:1:host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
StringMailboxIdentifier("1:host:1:host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index cf5420a92f..4cd1d71075 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -21,9 +21,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -38,8 +38,8 @@ import org.testng.annotations.Test;
public class HashExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1:host:1:host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
StringMailboxIdentifier("1:host:1:host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index cf6033cbd6..19e3db1711 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -33,8 +33,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class RandomExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1:host:1:host:1");
- private static final MailboxIdentifier MAILBOX_2 = new
StringMailboxIdentifier("1:host:1:host:2");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
+ private static final MailboxIdentifier MAILBOX_2 = new
JsonMailboxIdentifier("1", "host:1", "host:2");
private AutoCloseable _mocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 203cf7d907..e38e533a9a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -34,7 +34,7 @@ import org.testng.annotations.Test;
public class SingletonExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new
StringMailboxIdentifier("1:host:1:host:1");
+ private static final MailboxIdentifier MAILBOX_1 = new
JsonMailboxIdentifier("1", "host:1", "host:1");
private AutoCloseable _mocks;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]