This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 876e58dcc5 [multistage] framework to back-propagate metadata across
opChains (#11746)
876e58dcc5 is described below
commit 876e58dcc5e4a4c8639bdeff22fda55d9f12f3ef
Author: Rong Rong <[email protected]>
AuthorDate: Wed Oct 11 22:10:18 2023 -0700
[multistage] framework to back-propagate metadata across opChains (#11746)
* initial commit to alter early termination and signaling backwards
behavior
* add more tests
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 8 +++-
.../query/mailbox/InMemorySendingMailbox.java | 14 +++++-
.../pinot/query/mailbox/ReceivingMailbox.java | 17 +++++--
.../apache/pinot/query/mailbox/SendingMailbox.java | 6 +++
.../pinot/query/mailbox/channel/ChannelUtils.java | 2 +-
.../mailbox/channel/MailboxContentObserver.java | 7 ++-
.../mailbox/channel/MailboxStatusObserver.java | 16 ++++++-
.../query/runtime/operator/AggregateOperator.java | 1 -
.../operator/BaseMailboxReceiveOperator.java | 11 +++++
.../query/runtime/operator/HashJoinOperator.java | 5 +-
.../LeafStageTransferableBlockOperator.java | 18 ++++---
.../runtime/operator/LiteralValueOperator.java | 2 +-
.../runtime/operator/MailboxReceiveOperator.java | 10 +++-
.../runtime/operator/MailboxSendOperator.java | 17 ++++---
.../query/runtime/operator/MultiStageOperator.java | 9 ++++
.../pinot/query/runtime/operator/SortOperator.java | 13 ++---
.../runtime/operator/WindowAggregateOperator.java | 1 -
.../runtime/operator/exchange/BlockExchange.java | 32 ++++++++-----
.../query/runtime/operator/utils/AsyncStream.java | 5 ++
.../utils/BlockingMultiStreamConsumer.java | 6 +++
.../pinot/query/mailbox/MailboxServiceTest.java | 55 ++++++++++++++++++++++
.../runtime/operator/HashJoinOperatorTest.java | 1 +
.../operator/MailboxReceiveOperatorTest.java | 29 ++++++++++++
.../runtime/operator/MailboxSendOperatorTest.java | 21 +++++++--
.../pinot/query/runtime/operator/OpChainTest.java | 5 ++
.../query/runtime/operator/SortOperatorTest.java | 27 +++++++++++
.../operator/exchange/BlockExchangeTest.java | 33 +++++++++++++
27 files changed, 315 insertions(+), 56 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index ccb9afc270..87a8bda603 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -60,7 +60,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
@Override
public void send(TransferableBlock block)
throws IOException {
- if (isTerminated()) {
+ if (isTerminated() || (isEarlyTerminated() &&
!block.isEndOfStreamBlock())) {
return;
}
if (LOGGER.isDebugEnabled()) {
@@ -101,9 +101,13 @@ public class GrpcSendingMailbox implements SendingMailbox {
}
}
+ @Override
+ public boolean isEarlyTerminated() {
+ return _statusObserver.isEarlyTerminated();
+ }
+
@Override
public boolean isTerminated() {
- // TODO: We cannot differentiate early termination vs stream error
return _statusObserver.isFinished();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 444e7e1dde..306cd6992a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
private ReceivingMailbox _receivingMailbox;
private volatile boolean _isTerminated;
+ private volatile boolean _isEarlyTerminated;
public InMemorySendingMailbox(String id, MailboxService mailboxService, long
deadlineMs) {
_id = id;
@@ -44,7 +46,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
@Override
public void send(TransferableBlock block)
throws TimeoutException {
- if (_isTerminated) {
+ if (isTerminated() || (isEarlyTerminated() &&
!block.isEndOfStreamBlock())) {
return;
}
if (_receivingMailbox == null) {
@@ -55,13 +57,15 @@ public class InMemorySendingMailbox implements
SendingMailbox {
switch (status) {
case SUCCESS:
break;
+ case CANCELLED:
+ throw new QueryCancelledException(String.format("Mailbox: %s already
cancelled from upstream", _id));
case ERROR:
throw new RuntimeException(String.format("Mailbox: %s already errored
out (received error block before)", _id));
case TIMEOUT:
throw new TimeoutException(
String.format("Timed out adding block into mailbox: %s with
timeout: %dms", _id, timeoutMs));
case EARLY_TERMINATED:
- _isTerminated = true;
+ _isEarlyTerminated = true;
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " +
status);
@@ -70,6 +74,7 @@ public class InMemorySendingMailbox implements SendingMailbox
{
@Override
public void complete() {
+ _isTerminated = true;
}
@Override
@@ -85,6 +90,11 @@ public class InMemorySendingMailbox implements
SendingMailbox {
new RuntimeException("Cancelled by sender with exception: " +
t.getMessage(), t)));
}
+ @Override
+ public boolean isEarlyTerminated() {
+ return _isEarlyTerminated;
+ }
+
@Override
public boolean isTerminated() {
return _isTerminated;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 8bbffee360..97c8731f95 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -49,6 +49,8 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+ private volatile boolean _isEarlyTerminated = false;
+
@Nullable
private volatile Reader _reader;
@@ -78,7 +80,7 @@ public class ReceivingMailbox {
TransferableBlock errorBlock = _errorBlock.get();
if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring
the late block", _id);
- return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.EARLY_TERMINATED
+ return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.CANCELLED
: ReceivingMailboxStatus.ERROR;
}
if (timeoutMs <= 0) {
@@ -95,11 +97,11 @@ public class ReceivingMailbox {
LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from
mailbox: " + _id);
}
notifyReader();
- return ReceivingMailboxStatus.SUCCESS;
+ return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED
: ReceivingMailboxStatus.SUCCESS;
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out,
ignoring the late block", _id);
_blocks.clear();
- return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.EARLY_TERMINATED
+ return errorBlock == CANCELLED_ERROR_BLOCK ?
ReceivingMailboxStatus.CANCELLED
: ReceivingMailboxStatus.ERROR;
}
} else {
@@ -136,6 +138,13 @@ public class ReceivingMailbox {
return errorBlock != null ? errorBlock : _blocks.poll();
}
+ /**
+ * Early terminate the mailbox, called when upstream doesn't expect any more
data block.
+ */
+ public void earlyTerminate() {
+ _isEarlyTerminated = true;
+ }
+
/**
* Cancels the mailbox. No more blocks are accepted after calling this
method. Should only be called by the receive
* operator to clean up the remaining blocks.
@@ -166,6 +175,6 @@ public class ReceivingMailbox {
}
public enum ReceivingMailboxStatus {
- SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
+ SUCCESS, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 7cdfc8c97d..576a4703da 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -62,4 +62,10 @@ public interface SendingMailbox {
* mailbox is terminated.
*/
boolean isTerminated();
+
+ /**
+ * Returns whether the {@link ReceivingMailbox} is considered itself
finished, and is expected a EOS block with
+ * statistics to be sent next.
+ */
+ boolean isEarlyTerminated();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
index ef43fe79a3..e297ac5f9c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -23,5 +23,5 @@ public class ChannelUtils {
}
public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
- public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY =
"begin.of.stream";
+ public static final String MAILBOX_METADATA_REQUEST_EARLY_TERMINATE =
"request.early.terminate";
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 9074d81151..ce91701c3a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -84,6 +84,10 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
.putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
Integer.toString(_mailbox.getNumPendingBlocks())).build());
break;
+ case CANCELLED:
+ LOGGER.warn("Mailbox: {} already cancelled from upstream",
mailboxId);
+ cancelStream();
+ break;
case ERROR:
LOGGER.warn("Mailbox: {} already errored out (received error block
before)", mailboxId);
cancelStream();
@@ -94,7 +98,8 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
break;
case EARLY_TERMINATED:
LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
- onCompleted();
+
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
+
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE,
"true").build());
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " +
status);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
index 9f1c488d18..4288f4a087 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
@@ -35,11 +35,19 @@ public class MailboxStatusObserver implements
StreamObserver<MailboxStatus> {
private final AtomicInteger _bufferSize = new
AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
private final AtomicBoolean _finished = new AtomicBoolean();
+ private volatile boolean _isEarlyTerminated;
@Override
public void onNext(MailboxStatus mailboxStatus) {
- // when received a mailbox status from the receiving end, sending end
update the known buffer size available
- // so we can make better throughput send judgement. here is a simple
example.
+ // when receiving mailbox receives a data block it will return an updated
info of the receiving end status including
+ // 1. the buffer size available, for back-pressure handling
+ // 2. status whether there's no need to send any additional data block
b/c it considered itself finished.
+ // -- handle early-terminate EOS request.
+ if (Boolean.parseBoolean(
+
mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
{
+ _isEarlyTerminated = true;
+ }
+ // -- handling buffer size back-pressure
// TODO: this feedback info is not used to throttle the send speed. it is
currently being discarded.
if
(mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY))
{
_bufferSize.set(
@@ -49,6 +57,10 @@ public class MailboxStatusObserver implements
StreamObserver<MailboxStatus> {
}
}
+ public boolean isEarlyTerminated() {
+ return _isEarlyTerminated;
+ }
+
public int getBufferSize() {
return _bufferSize.get();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index e3fc9cc315..27b6cbdf08 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -142,7 +142,6 @@ public class AggregateOperator extends MultiStageOperator {
if (!_hasReturnedAggregateBlock) {
return produceAggregatedBlock();
} else {
- // TODO: Move to close call.
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index c633783d48..438cec8494 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -81,6 +81,12 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
return _mailboxIds;
}
+ @Override
+ protected void earlyTerminate() {
+ _isEarlyTerminated = true;
+ _multiConsumer.earlyTerminate();
+ }
+
@Override
public List<MultiStageOperator> getChildOperators() {
return Collections.emptyList();
@@ -128,6 +134,11 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
_mailbox.registeredReader(onNewData::newDataAvailable);
}
+ @Override
+ public void earlyTerminate() {
+ _mailbox.earlyTerminate();
+ }
+
@Override
public void cancel() {
_mailbox.cancel();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 32ca46290a..295e0c49af 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -239,9 +239,8 @@ public class HashJoinOperator extends MultiStageOperator {
}
_currentRowsInHashTable += container.size();
if (_currentRowsInHashTable == _maxRowsInHashTable) {
- // Early terminate right table operator.
- _rightTableOperator.close();
- break;
+ // setting only the rightTableOperator to be early terminated and
awaits EOS block next.
+ _rightTableOperator.earlyTerminate();
}
rightBlock = _rightTableOperator.nextBlock();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 4e690e4041..35361e48e9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -126,21 +126,25 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
if (exceptions != null) {
return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
}
- if (resultsBlock != LAST_RESULTS_BLOCK) {
+ if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
+ return constructMetadataBlock();
+ } else {
// Regular data block
return composeTransferableBlock(resultsBlock, _dataSchema);
- } else {
- // All data blocks have been returned. Record the stats and return EOS.
- Map<String, String> executionStats = _executionStats;
- OperatorStats operatorStats = _opChainStats.getOperatorStats(_context,
getOperatorId());
- operatorStats.recordExecutionStats(executionStats);
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
}
+ private TransferableBlock constructMetadataBlock() {
+ // All data blocks have been returned. Record the stats and return EOS.
+ Map<String, String> executionStats = _executionStats;
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context,
getOperatorId());
+ operatorStats.recordExecutionStats(executionStats);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+
private Future<Void> startExecution() {
ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
return _executorService.submit(() -> {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 029e58e9a2..59aeec3e75 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -61,7 +61,7 @@ public class LiteralValueOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- if (!_isLiteralBlockReturned) {
+ if (!_isLiteralBlockReturned && !_isEarlyTerminated) {
_isLiteralBlockReturned = true;
return _rexLiteralBlock;
} else {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 60c751405c..ad7913cdc1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -42,6 +42,14 @@ public class MailboxReceiveOperator extends
BaseMailboxReceiveOperator {
@Override
protected TransferableBlock getNextBlock() {
- return getMultiConsumer().readBlockBlocking();
+ TransferableBlock block = getMultiConsumer().readBlockBlocking();
+ // When early termination flag is set, caller is expecting an EOS block to
be returned, however since the 2 stages
+ // between sending/receiving mailbox are setting early termination flag
asynchronously, there's chances that the
+ // next block pulled out of the ReceivingMailbox to be an already buffered
normal data block. This requires the
+ // MailboxReceiveOperator to continue pulling and dropping data block
until an EOS block is observed.
+ while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
+ block = getMultiConsumer().readBlockBlocking();
+ }
+ return block;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 74fe297a8f..f6f25510df 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -38,7 +38,7 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,14 +123,16 @@ public class MailboxSendOperator extends
MultiStageOperator {
// and the receiving opChain will not be able to access the stats from
the previous opChain
TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ // no need to check early terminate signal b/c the current block is
already EOS
sendTransferableBlock(eosBlockWithStats);
} else {
- sendTransferableBlock(block);
+ if (sendTransferableBlock(block)) {
+ earlyTerminate();
+ }
}
return block;
- } catch (EarlyTerminationException e) {
- // TODO: Query stats are not sent when opChain is early terminated
- LOGGER.debug("Early terminating opChain: {}", _context.getId());
+ } catch (QueryCancelledException e) {
+ LOGGER.debug("Query was cancelled! for opChain: {}", _context.getId());
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} catch (TimeoutException e) {
LOGGER.warn("Timed out transferring data on opChain: {}",
_context.getId(), e);
@@ -147,12 +149,13 @@ public class MailboxSendOperator extends
MultiStageOperator {
}
}
- private void sendTransferableBlock(TransferableBlock block)
+ private boolean sendTransferableBlock(TransferableBlock block)
throws Exception {
- _exchange.send(block);
+ boolean isEarlyTerminated = _exchange.send(block);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[SEND]== Block " + block + " sent from: " +
_context.getId());
}
+ return isEarlyTerminated;
}
/**
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 84228071c9..ade326ea20 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -36,11 +36,13 @@ public abstract class MultiStageOperator implements
Operator<TransferableBlock>,
protected final OpChainExecutionContext _context;
protected final String _operatorId;
protected final OpChainStats _opChainStats;
+ protected boolean _isEarlyTerminated;
public MultiStageOperator(OpChainExecutionContext context) {
_context = context;
_operatorId = Joiner.on("_").join(getClass().getSimpleName(),
_context.getStageId(), _context.getServer());
_opChainStats = _context.getStats();
+ _isEarlyTerminated = false;
}
@Override
@@ -70,6 +72,13 @@ public abstract class MultiStageOperator implements
Operator<TransferableBlock>,
// Make it protected because we should always call nextBlock()
protected abstract TransferableBlock getNextBlock();
+ protected void earlyTerminate() {
+ _isEarlyTerminated = true;
+ for (MultiStageOperator child : getChildOperators()) {
+ child.earlyTerminate();
+ }
+ }
+
protected boolean shouldCollectStats() {
return _context.isTraceEnabled();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index d803848a7f..64f0926a63 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -51,7 +51,7 @@ public class SortOperator extends MultiStageOperator {
private final ArrayList<Object[]> _rows;
private final int _numRowsToKeep;
- private boolean _isSortedBlockConstructed;
+ private boolean _hasReturnedSortedBlock;
private TransferableBlock _upstreamErrorBlock;
public SortOperator(OpChainExecutionContext context, MultiStageOperator
upstreamOperator,
@@ -74,7 +74,7 @@ public class SortOperator extends MultiStageOperator {
_offset = Math.max(offset, 0);
_dataSchema = dataSchema;
_upstreamErrorBlock = null;
- _isSortedBlockConstructed = false;
+ _hasReturnedSortedBlock = false;
// Setting numRowsToKeep as default maximum on Broker if limit not set.
// TODO: make this default behavior configurable.
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
@@ -123,8 +123,8 @@ public class SortOperator extends MultiStageOperator {
return _upstreamErrorBlock;
}
- if (!_isSortedBlockConstructed) {
- _isSortedBlockConstructed = true;
+ if (!_hasReturnedSortedBlock) {
+ _hasReturnedSortedBlock = true;
if (_priorityQueue == null) {
if (_rows.size() > _offset) {
List<Object[]> row = _rows.subList(_offset, _rows.size());
@@ -150,7 +150,7 @@ public class SortOperator extends MultiStageOperator {
}
private void consumeInputBlocks() {
- if (!_isSortedBlockConstructed) {
+ if (!_hasReturnedSortedBlock) {
TransferableBlock block = _upstreamOperator.nextBlock();
while (block.isDataBlock()) {
List<Object[]> container = block.getContainer();
@@ -164,7 +164,8 @@ public class SortOperator extends MultiStageOperator {
_rows.addAll(container.subList(0, _numRowsToKeep - numRows));
LOGGER.debug("Early terminate at SortOperator - operatorId={},
opChainId={}", _operatorId,
_context.getId());
- break;
+ // setting operator to be early terminated and awaits EOS block
next.
+ earlyTerminate();
}
}
} else {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index a4dfde6263..ea9e065416 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -172,7 +172,6 @@ public class WindowAggregateOperator extends
MultiStageOperator {
if (!_hasReturnedWindowAggregateBlock) {
return produceWindowAggregatedBlock();
} else {
- // TODO: Move to close call.
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index ae8d0c75d4..453288ecb3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -28,7 +28,6 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
/**
@@ -68,24 +67,31 @@ public abstract class BlockExchange {
_splitter = splitter;
}
- public void send(TransferableBlock block)
+ /**
+ * API to send a block to the destination mailboxes.
+ * @param block the block to be transferred
+ * @return true if all the mailboxes has been early terminated.
+ * @throws Exception when sending stream unexpectedly closed.
+ */
+ public boolean send(TransferableBlock block)
throws Exception {
- boolean isEarlyTerminated = true;
- for (SendingMailbox sendingMailbox : _sendingMailboxes) {
- if (!sendingMailbox.isTerminated()) {
- isEarlyTerminated = false;
- break;
- }
- }
- if (isEarlyTerminated) {
- throw new EarlyTerminationException();
- }
if (block.isEndOfStreamBlock()) {
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
sendBlock(sendingMailbox, block);
}
+ return false;
} else {
- route(_sendingMailboxes, block);
+ boolean isEarlyTerminated = true;
+ for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+ if (!sendingMailbox.isEarlyTerminated()) {
+ isEarlyTerminated = false;
+ break;
+ }
+ }
+ if (!isEarlyTerminated) {
+ route(_sendingMailboxes, block);
+ }
+ return isEarlyTerminated;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
index 7df33ebefc..287ae7359e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
@@ -67,6 +67,11 @@ public interface AsyncStream<E> {
*/
void cancel();
+ /**
+ * Set this stream to early terminate state, asking for metadata block.
+ */
+ void earlyTerminate();
+
interface OnNewData {
void newDataAvailable();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index 4605dc8d4d..387a44e754 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -71,6 +71,12 @@ public abstract class BlockingMultiStreamConsumer<E>
implements AutoCloseable {
cancelRemainingMailboxes();
}
+ public void earlyTerminate() {
+ for (AsyncStream<E> mailbox : _mailboxes) {
+ mailbox.earlyTerminate();
+ }
+ }
+
protected void cancelRemainingMailboxes() {
for (AsyncStream<E> mailbox : _mailboxes) {
mailbox.cancel();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 11616a68f0..8e6b563ac4 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -292,6 +292,31 @@ public class MailboxServiceTest {
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
}
+ @Test
+ public void testLocalEarlyTerminated()
+ throws Exception {
+ String mailboxId = MailboxIdUtils.toMailboxId(_requestId++,
SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0);
+ SendingMailbox sendingMailbox =
+ _mailboxService1.getSendingMailbox("localhost",
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
+ ReceivingMailbox receivingMailbox =
_mailboxService1.getReceivingMailbox(mailboxId);
+ receivingMailbox.registeredReader(() -> { });
+
+ // send a block
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+ // receiving-side early terminates after pulling the first block
+ TransferableBlock block = receivingMailbox.poll();
+ receivingMailbox.earlyTerminate();
+ assertNotNull(block);
+ assertEquals(block.getNumRows(), 1);
+ // send another block b/c it doesn't guarantee the next block must be EOS
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+ // send a metadata block
+
sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // sending side should early terminate
+ assertTrue(sendingMailbox.isEarlyTerminated());
+ }
+
@Test
public void testRemoteHappyPathSendFirst()
throws Exception {
@@ -556,4 +581,34 @@ public class MailboxServiceTest {
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1);
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
}
+
+ @Test
+ public void testRemoteEarlyTerminated()
+ throws Exception {
+ String mailboxId = MailboxIdUtils.toMailboxId(_requestId++,
SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0);
+
+ // Sends are non-blocking as long as channel capacity is not breached
+ SendingMailbox sendingMailbox =
+ _mailboxService2.getSendingMailbox("localhost",
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
+ ReceivingMailbox receivingMailbox =
_mailboxService1.getReceivingMailbox(mailboxId);
+ receivingMailbox.registeredReader(() -> { });
+
+ // send a block
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+ // receiving-side early terminates after pulling the first block
+ TestUtils.waitForCondition(aVoid -> {
+ TransferableBlock block = receivingMailbox.poll();
+ return block != null && block.getNumRows() == 1;
+ }, 1000L, "Failed to deliver mails");
+ receivingMailbox.earlyTerminate();
+
+ // send another block b/c it doesn't guarantee the next block must be EOS
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+ // send a metadata block
+
sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ sendingMailbox.complete();
+
+ // sending side should early terminate
+ TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(),
1000L, "Failed to early-terminate sender");
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 9abd2e78b5..88d9cf4fe0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -646,6 +646,7 @@ public class HashJoinOperatorTest {
new HashJoinOperator(OperatorTestUtil.getDefaultContext(),
_leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
+ Mockito.verify(_rightOperator).earlyTerminate();
Assert.assertFalse(result.isErrorBlock());
Assert.assertEquals(result.getNumRows(), 1);
Assert.assertTrue(result.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 6e3fa310a9..9f5b8dfffe 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -44,6 +44,7 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -236,4 +237,32 @@ public class MailboxReceiveOperatorTest {
assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
}
}
+
+ @Test
+ public void shouldEarlyTerminateMailboxesWhenIndicated() {
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 2};
+ Object[] row3 = new Object[]{3, 3};
+
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA,
row1),
+ OperatorTestUtil.block(DATA_SCHEMA, row3),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2);
+ when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA,
row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ OpChainExecutionContext context =
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS,
Long.MAX_VALUE, _stageMetadataBoth);
+ try (MailboxReceiveOperator receiveOp = new
MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
+ 1)) {
+ // Receive first block from server1
+ assertEquals(receiveOp.nextBlock().getContainer().get(0), row1);
+ // at this point operator received a signal to early terminate
+ receiveOp.earlyTerminate();
+ // Receive next block should be EOS even if upstream keep sending normal
block.
+ assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+ // Assure that early terminate signal goes into each mailbox
+ verify(_mailbox1).earlyTerminate();
+ verify(_mailbox2).earlyTerminate();
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index c01447d7cd..7a49dcf16a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -39,10 +39,7 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
@@ -185,6 +182,22 @@ public class MailboxSendOperatorTest {
assertTrue(resultMetadata.containsKey(mailboxSendOperator.getOperatorId()));
}
+ @Test
+ public void shouldEarlyTerminateWhenUpstreamWhenIndicated()
+ throws Exception {
+ // Given:
+ TransferableBlock dataBlock =
+ OperatorTestUtil.block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
+ when(_sourceOperator.nextBlock()).thenReturn(dataBlock);
+ doReturn(true).when(_exchange).send(any());
+
+ // When:
+ TransferableBlock block = getMailboxSendOperator().nextBlock();
+
+ // Then:
+ verify(_sourceOperator).earlyTerminate();
+ }
+
private MailboxSendOperator getMailboxSendOperator() {
StageMetadata stageMetadata = new
StageMetadata.Builder().setWorkerMetadataList(
Collections.singletonList(new
WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index db6d18ee3c..fb937e6238 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -375,6 +375,11 @@ public class OpChainTest {
_sleepTimeInMillis = sleepTimeInMillis;
}
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return Collections.singletonList(_upstream);
+ }
+
@Override
protected TransferableBlock getNextBlock() {
try {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index c58b394d19..03fc1755ba 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation.Direction;
@@ -618,6 +619,32 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to
propagate");
}
+ @Test
+ public void shouldEarlyTerminateCorrectlyWithSignalingPropagateUpstream() {
+ // Given:
+ List<RexExpression> collation = Collections.emptyList();
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new
DataSchema.ColumnDataType[]{INT});
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input,
collation, directions, nullDirections, 10, 0,
+ schema, false);
+
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1},
new Object[]{2}, new Object[]{3},
+ new Object[]{4}, new Object[]{5}, new Object[]{6}, new Object[]{7},
new Object[]{8}, new Object[]{9},
+ new Object[]{10}, new Object[]{11}, new Object[]{12}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Mockito.verify(_input).earlyTerminate();
+ Assert.assertEquals(block.getNumRows(), 10);
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to
propagate");
+ }
+
private static List<RexExpression> collation(int... indexes) {
return
Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index db7e8cfc55..752d8ea4b3 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -37,6 +37,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.when;
+
public class BlockExchangeTest {
private AutoCloseable _mocks;
@@ -99,6 +101,37 @@ public class BlockExchangeTest {
Mockito.verify(_mailbox2, Mockito.never()).send(Mockito.any());
}
+ @Test
+ public void shouldSignalEarlyTerminationProperly()
+ throws Exception {
+ // Given:
+ List<SendingMailbox> destinations = ImmutableList.of(_mailbox1, _mailbox2);
+ BlockExchange exchange = new TestBlockExchange(destinations);
+ TransferableBlock block = new TransferableBlock(ImmutableList.of(new
Object[]{"val"}),
+ new DataSchema(new String[]{"foo"}, new
ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW);
+
+ // When send normal block and some mailbox has terminated
+ when(_mailbox1.isEarlyTerminated()).thenReturn(true);
+ boolean isEarlyTerminated = exchange.send(block);
+
+ // Then:
+ Assert.assertFalse(isEarlyTerminated);
+
+ // When send normal block and both terminated
+ when(_mailbox2.isTerminated()).thenReturn(true);
+ isEarlyTerminated = exchange.send(block);
+
+ // Then:
+ Assert.assertFalse(isEarlyTerminated);
+
+ // When send metadata block
+ when(_mailbox2.isEarlyTerminated()).thenReturn(true);
+ isEarlyTerminated = exchange.send(block);
+
+ // Then:
+ Assert.assertTrue(isEarlyTerminated);
+ }
+
@Test
public void shouldSplitBlocks()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]