This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 876e58dcc5 [multistage] framework to back-propagate metadata across 
opChains (#11746)
876e58dcc5 is described below

commit 876e58dcc5e4a4c8639bdeff22fda55d9f12f3ef
Author: Rong Rong <[email protected]>
AuthorDate: Wed Oct 11 22:10:18 2023 -0700

    [multistage] framework to back-propagate metadata across opChains (#11746)
    
    * initial commit to alter early termination and signaling backwards
    behavior
    * add more tests
    
    
    ---------
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  8 +++-
 .../query/mailbox/InMemorySendingMailbox.java      | 14 +++++-
 .../pinot/query/mailbox/ReceivingMailbox.java      | 17 +++++--
 .../apache/pinot/query/mailbox/SendingMailbox.java |  6 +++
 .../pinot/query/mailbox/channel/ChannelUtils.java  |  2 +-
 .../mailbox/channel/MailboxContentObserver.java    |  7 ++-
 .../mailbox/channel/MailboxStatusObserver.java     | 16 ++++++-
 .../query/runtime/operator/AggregateOperator.java  |  1 -
 .../operator/BaseMailboxReceiveOperator.java       | 11 +++++
 .../query/runtime/operator/HashJoinOperator.java   |  5 +-
 .../LeafStageTransferableBlockOperator.java        | 18 ++++---
 .../runtime/operator/LiteralValueOperator.java     |  2 +-
 .../runtime/operator/MailboxReceiveOperator.java   | 10 +++-
 .../runtime/operator/MailboxSendOperator.java      | 17 ++++---
 .../query/runtime/operator/MultiStageOperator.java |  9 ++++
 .../pinot/query/runtime/operator/SortOperator.java | 13 ++---
 .../runtime/operator/WindowAggregateOperator.java  |  1 -
 .../runtime/operator/exchange/BlockExchange.java   | 32 ++++++++-----
 .../query/runtime/operator/utils/AsyncStream.java  |  5 ++
 .../utils/BlockingMultiStreamConsumer.java         |  6 +++
 .../pinot/query/mailbox/MailboxServiceTest.java    | 55 ++++++++++++++++++++++
 .../runtime/operator/HashJoinOperatorTest.java     |  1 +
 .../operator/MailboxReceiveOperatorTest.java       | 29 ++++++++++++
 .../runtime/operator/MailboxSendOperatorTest.java  | 21 +++++++--
 .../pinot/query/runtime/operator/OpChainTest.java  |  5 ++
 .../query/runtime/operator/SortOperatorTest.java   | 27 +++++++++++
 .../operator/exchange/BlockExchangeTest.java       | 33 +++++++++++++
 27 files changed, 315 insertions(+), 56 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index ccb9afc270..87a8bda603 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -60,7 +60,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
   @Override
   public void send(TransferableBlock block)
       throws IOException {
-    if (isTerminated()) {
+    if (isTerminated() || (isEarlyTerminated() && 
!block.isEndOfStreamBlock())) {
       return;
     }
     if (LOGGER.isDebugEnabled()) {
@@ -101,9 +101,13 @@ public class GrpcSendingMailbox implements SendingMailbox {
     }
   }
 
+  @Override
+  public boolean isEarlyTerminated() {
+    return _statusObserver.isEarlyTerminated();
+  }
+
   @Override
   public boolean isTerminated() {
-    // TODO: We cannot differentiate early termination vs stream error
     return _statusObserver.isFinished();
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 444e7e1dde..306cd6992a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +35,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
 
   private ReceivingMailbox _receivingMailbox;
   private volatile boolean _isTerminated;
+  private volatile boolean _isEarlyTerminated;
 
   public InMemorySendingMailbox(String id, MailboxService mailboxService, long 
deadlineMs) {
     _id = id;
@@ -44,7 +46,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
   @Override
   public void send(TransferableBlock block)
       throws TimeoutException {
-    if (_isTerminated) {
+    if (isTerminated() || (isEarlyTerminated() && 
!block.isEndOfStreamBlock())) {
       return;
     }
     if (_receivingMailbox == null) {
@@ -55,13 +57,15 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
     switch (status) {
       case SUCCESS:
         break;
+      case CANCELLED:
+        throw new QueryCancelledException(String.format("Mailbox: %s already 
cancelled from upstream", _id));
       case ERROR:
         throw new RuntimeException(String.format("Mailbox: %s already errored 
out (received error block before)", _id));
       case TIMEOUT:
         throw new TimeoutException(
             String.format("Timed out adding block into mailbox: %s with 
timeout: %dms", _id, timeoutMs));
       case EARLY_TERMINATED:
-        _isTerminated = true;
+        _isEarlyTerminated = true;
         break;
       default:
         throw new IllegalStateException("Unsupported mailbox status: " + 
status);
@@ -70,6 +74,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
 
   @Override
   public void complete() {
+    _isTerminated = true;
   }
 
   @Override
@@ -85,6 +90,11 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
         new RuntimeException("Cancelled by sender with exception: " + 
t.getMessage(), t)));
   }
 
+  @Override
+  public boolean isEarlyTerminated() {
+    return _isEarlyTerminated;
+  }
+
   @Override
   public boolean isTerminated() {
     return _isTerminated;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 8bbffee360..97c8731f95 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -49,6 +49,8 @@ public class ReceivingMailbox {
   // TODO: Revisit if this is the correct way to apply back pressure
   private final BlockingQueue<TransferableBlock> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
   private final AtomicReference<TransferableBlock> _errorBlock = new 
AtomicReference<>();
+  private volatile boolean _isEarlyTerminated = false;
+
   @Nullable
   private volatile Reader _reader;
 
@@ -78,7 +80,7 @@ public class ReceivingMailbox {
     TransferableBlock errorBlock = _errorBlock.get();
     if (errorBlock != null) {
       LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring 
the late block", _id);
-      return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.EARLY_TERMINATED
+      return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.CANCELLED
           : ReceivingMailboxStatus.ERROR;
     }
     if (timeoutMs <= 0) {
@@ -95,11 +97,11 @@ public class ReceivingMailbox {
             LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from 
mailbox: " + _id);
           }
           notifyReader();
-          return ReceivingMailboxStatus.SUCCESS;
+          return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED 
: ReceivingMailboxStatus.SUCCESS;
         } else {
           LOGGER.debug("Mailbox: {} is already cancelled or errored out, 
ignoring the late block", _id);
           _blocks.clear();
-          return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.EARLY_TERMINATED
+          return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.CANCELLED
               : ReceivingMailboxStatus.ERROR;
         }
       } else {
@@ -136,6 +138,13 @@ public class ReceivingMailbox {
     return errorBlock != null ? errorBlock : _blocks.poll();
   }
 
+  /**
+   * Early terminate the mailbox, called when upstream doesn't expect any more 
data block.
+   */
+  public void earlyTerminate() {
+    _isEarlyTerminated = true;
+  }
+
   /**
    * Cancels the mailbox. No more blocks are accepted after calling this 
method. Should only be called by the receive
    * operator to clean up the remaining blocks.
@@ -166,6 +175,6 @@ public class ReceivingMailbox {
   }
 
   public enum ReceivingMailboxStatus {
-    SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
+    SUCCESS, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 7cdfc8c97d..576a4703da 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -62,4 +62,10 @@ public interface SendingMailbox {
    * mailbox is terminated.
    */
   boolean isTerminated();
+
+  /**
+   * Returns whether the {@link ReceivingMailbox} is considered itself 
finished, and is expected a EOS block with
+   * statistics to be sent next.
+   */
+  boolean isEarlyTerminated();
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
index ef43fe79a3..e297ac5f9c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java
@@ -23,5 +23,5 @@ public class ChannelUtils {
   }
 
   public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
-  public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY = 
"begin.of.stream";
+  public static final String MAILBOX_METADATA_REQUEST_EARLY_TERMINATE = 
"request.early.terminate";
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 9074d81151..ce91701c3a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -84,6 +84,10 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
               .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
                   Integer.toString(_mailbox.getNumPendingBlocks())).build());
           break;
+        case CANCELLED:
+          LOGGER.warn("Mailbox: {} already cancelled from upstream", 
mailboxId);
+          cancelStream();
+          break;
         case ERROR:
           LOGGER.warn("Mailbox: {} already errored out (received error block 
before)", mailboxId);
           cancelStream();
@@ -94,7 +98,8 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
           break;
         case EARLY_TERMINATED:
           LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
-          onCompleted();
+          
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
+              
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, 
"true").build());
           break;
         default:
           throw new IllegalStateException("Unsupported mailbox status: " + 
status);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
index 9f1c488d18..4288f4a087 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
@@ -35,11 +35,19 @@ public class MailboxStatusObserver implements 
StreamObserver<MailboxStatus> {
 
   private final AtomicInteger _bufferSize = new 
AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
   private final AtomicBoolean _finished = new AtomicBoolean();
+  private volatile boolean _isEarlyTerminated;
 
   @Override
   public void onNext(MailboxStatus mailboxStatus) {
-    // when received a mailbox status from the receiving end, sending end 
update the known buffer size available
-    // so we can make better throughput send judgement. here is a simple 
example.
+    // when receiving mailbox receives a data block it will return an updated 
info of the receiving end status including
+    //   1. the buffer size available, for back-pressure handling
+    //   2. status whether there's no need to send any additional data block 
b/c it considered itself finished.
+    // -- handle early-terminate EOS request.
+    if (Boolean.parseBoolean(
+        
mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
 {
+      _isEarlyTerminated = true;
+    }
+    // -- handling buffer size back-pressure
     // TODO: this feedback info is not used to throttle the send speed. it is 
currently being discarded.
     if 
(mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY))
 {
       _bufferSize.set(
@@ -49,6 +57,10 @@ public class MailboxStatusObserver implements 
StreamObserver<MailboxStatus> {
     }
   }
 
+  public boolean isEarlyTerminated() {
+    return _isEarlyTerminated;
+  }
+
   public int getBufferSize() {
     return _bufferSize.get();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index e3fc9cc315..27b6cbdf08 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -142,7 +142,6 @@ public class AggregateOperator extends MultiStageOperator {
       if (!_hasReturnedAggregateBlock) {
         return produceAggregatedBlock();
       } else {
-        // TODO: Move to close call.
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } catch (Exception e) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index c633783d48..438cec8494 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -81,6 +81,12 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
     return _mailboxIds;
   }
 
+  @Override
+  protected void earlyTerminate() {
+    _isEarlyTerminated = true;
+    _multiConsumer.earlyTerminate();
+  }
+
   @Override
   public List<MultiStageOperator> getChildOperators() {
     return Collections.emptyList();
@@ -128,6 +134,11 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
       _mailbox.registeredReader(onNewData::newDataAvailable);
     }
 
+    @Override
+    public void earlyTerminate() {
+      _mailbox.earlyTerminate();
+    }
+
     @Override
     public void cancel() {
       _mailbox.cancel();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 32ca46290a..295e0c49af 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -239,9 +239,8 @@ public class HashJoinOperator extends MultiStageOperator {
       }
       _currentRowsInHashTable += container.size();
       if (_currentRowsInHashTable == _maxRowsInHashTable) {
-        // Early terminate right table operator.
-        _rightTableOperator.close();
-        break;
+        // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
+        _rightTableOperator.earlyTerminate();
       }
       rightBlock = _rightTableOperator.nextBlock();
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 4e690e4041..35361e48e9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -126,21 +126,25 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
       if (exceptions != null) {
         return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
       }
-      if (resultsBlock != LAST_RESULTS_BLOCK) {
+      if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
+        return constructMetadataBlock();
+      } else {
         // Regular data block
         return composeTransferableBlock(resultsBlock, _dataSchema);
-      } else {
-        // All data blocks have been returned. Record the stats and return EOS.
-        Map<String, String> executionStats = _executionStats;
-        OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
getOperatorId());
-        operatorStats.recordExecutionStats(executionStats);
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
+  private TransferableBlock constructMetadataBlock() {
+    // All data blocks have been returned. Record the stats and return EOS.
+    Map<String, String> executionStats = _executionStats;
+    OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
getOperatorId());
+    operatorStats.recordExecutionStats(executionStats);
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+  }
+
   private Future<Void> startExecution() {
     ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
     return _executorService.submit(() -> {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 029e58e9a2..59aeec3e75 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -61,7 +61,7 @@ public class LiteralValueOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (!_isLiteralBlockReturned) {
+    if (!_isLiteralBlockReturned && !_isEarlyTerminated) {
       _isLiteralBlockReturned = true;
       return _rexLiteralBlock;
     } else {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 60c751405c..ad7913cdc1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -42,6 +42,14 @@ public class MailboxReceiveOperator extends 
BaseMailboxReceiveOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    return getMultiConsumer().readBlockBlocking();
+    TransferableBlock block = getMultiConsumer().readBlockBlocking();
+    // When early termination flag is set, caller is expecting an EOS block to 
be returned, however since the 2 stages
+    // between sending/receiving mailbox are setting early termination flag 
asynchronously, there's chances that the
+    // next block pulled out of the ReceivingMailbox to be an already buffered 
normal data block. This requires the
+    // MailboxReceiveOperator to continue pulling and dropping data block 
until an EOS block is observed.
+    while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
+      block = getMultiConsumer().readBlockBlocking();
+    }
+    return block;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 74fe297a8f..f6f25510df 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -38,7 +38,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,14 +123,16 @@ public class MailboxSendOperator extends 
MultiStageOperator {
         // and the receiving opChain will not be able to access the stats from 
the previous opChain
         TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
             
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+        // no need to check early terminate signal b/c the current block is 
already EOS
         sendTransferableBlock(eosBlockWithStats);
       } else {
-        sendTransferableBlock(block);
+        if (sendTransferableBlock(block)) {
+          earlyTerminate();
+        }
       }
       return block;
-    } catch (EarlyTerminationException e) {
-      // TODO: Query stats are not sent when opChain is early terminated
-      LOGGER.debug("Early terminating opChain: {}", _context.getId());
+    } catch (QueryCancelledException e) {
+      LOGGER.debug("Query was cancelled! for opChain: {}", _context.getId());
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     } catch (TimeoutException e) {
       LOGGER.warn("Timed out transferring data on opChain: {}", 
_context.getId(), e);
@@ -147,12 +149,13 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     }
   }
 
-  private void sendTransferableBlock(TransferableBlock block)
+  private boolean sendTransferableBlock(TransferableBlock block)
       throws Exception {
-    _exchange.send(block);
+    boolean isEarlyTerminated = _exchange.send(block);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[SEND]== Block " + block + " sent from: " + 
_context.getId());
     }
+    return isEarlyTerminated;
   }
 
   /**
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 84228071c9..ade326ea20 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -36,11 +36,13 @@ public abstract class MultiStageOperator implements 
Operator<TransferableBlock>,
   protected final OpChainExecutionContext _context;
   protected final String _operatorId;
   protected final OpChainStats _opChainStats;
+  protected boolean _isEarlyTerminated;
 
   public MultiStageOperator(OpChainExecutionContext context) {
     _context = context;
     _operatorId = Joiner.on("_").join(getClass().getSimpleName(), 
_context.getStageId(), _context.getServer());
     _opChainStats = _context.getStats();
+    _isEarlyTerminated = false;
   }
 
   @Override
@@ -70,6 +72,13 @@ public abstract class MultiStageOperator implements 
Operator<TransferableBlock>,
   // Make it protected because we should always call nextBlock()
   protected abstract TransferableBlock getNextBlock();
 
+  protected void earlyTerminate() {
+    _isEarlyTerminated = true;
+    for (MultiStageOperator child : getChildOperators()) {
+      child.earlyTerminate();
+    }
+  }
+
   protected boolean shouldCollectStats() {
     return _context.isTraceEnabled();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index d803848a7f..64f0926a63 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -51,7 +51,7 @@ public class SortOperator extends MultiStageOperator {
   private final ArrayList<Object[]> _rows;
   private final int _numRowsToKeep;
 
-  private boolean _isSortedBlockConstructed;
+  private boolean _hasReturnedSortedBlock;
   private TransferableBlock _upstreamErrorBlock;
 
   public SortOperator(OpChainExecutionContext context, MultiStageOperator 
upstreamOperator,
@@ -74,7 +74,7 @@ public class SortOperator extends MultiStageOperator {
     _offset = Math.max(offset, 0);
     _dataSchema = dataSchema;
     _upstreamErrorBlock = null;
-    _isSortedBlockConstructed = false;
+    _hasReturnedSortedBlock = false;
     // Setting numRowsToKeep as default maximum on Broker if limit not set.
     // TODO: make this default behavior configurable.
     _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
@@ -123,8 +123,8 @@ public class SortOperator extends MultiStageOperator {
       return _upstreamErrorBlock;
     }
 
-    if (!_isSortedBlockConstructed) {
-      _isSortedBlockConstructed = true;
+    if (!_hasReturnedSortedBlock) {
+      _hasReturnedSortedBlock = true;
       if (_priorityQueue == null) {
         if (_rows.size() > _offset) {
           List<Object[]> row = _rows.subList(_offset, _rows.size());
@@ -150,7 +150,7 @@ public class SortOperator extends MultiStageOperator {
   }
 
   private void consumeInputBlocks() {
-    if (!_isSortedBlockConstructed) {
+    if (!_hasReturnedSortedBlock) {
       TransferableBlock block = _upstreamOperator.nextBlock();
       while (block.isDataBlock()) {
         List<Object[]> container = block.getContainer();
@@ -164,7 +164,8 @@ public class SortOperator extends MultiStageOperator {
               _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
               LOGGER.debug("Early terminate at SortOperator - operatorId={}, 
opChainId={}", _operatorId,
                   _context.getId());
-              break;
+              // setting operator to be early terminated and awaits EOS block 
next.
+              earlyTerminate();
             }
           }
         } else {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index a4dfde6263..ea9e065416 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -172,7 +172,6 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       if (!_hasReturnedWindowAggregateBlock) {
         return produceWindowAggregatedBlock();
       } else {
-        // TODO: Move to close call.
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } catch (Exception e) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index ae8d0c75d4..453288ecb3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -28,7 +28,6 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
 
 
 /**
@@ -68,24 +67,31 @@ public abstract class BlockExchange {
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block)
+  /**
+   * API to send a block to the destination mailboxes.
+   * @param block the block to be transferred
+   * @return true if all the mailboxes has been early terminated.
+   * @throws Exception when sending stream unexpectedly closed.
+   */
+  public boolean send(TransferableBlock block)
       throws Exception {
-    boolean isEarlyTerminated = true;
-    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
-      if (!sendingMailbox.isTerminated()) {
-        isEarlyTerminated = false;
-        break;
-      }
-    }
-    if (isEarlyTerminated) {
-      throw new EarlyTerminationException();
-    }
     if (block.isEndOfStreamBlock()) {
       for (SendingMailbox sendingMailbox : _sendingMailboxes) {
         sendBlock(sendingMailbox, block);
       }
+      return false;
     } else {
-      route(_sendingMailboxes, block);
+      boolean isEarlyTerminated = true;
+      for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+        if (!sendingMailbox.isEarlyTerminated()) {
+          isEarlyTerminated = false;
+          break;
+        }
+      }
+      if (!isEarlyTerminated) {
+        route(_sendingMailboxes, block);
+      }
+      return isEarlyTerminated;
     }
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
index 7df33ebefc..287ae7359e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java
@@ -67,6 +67,11 @@ public interface AsyncStream<E> {
    */
   void cancel();
 
+  /**
+   * Set this stream to early terminate state, asking for metadata block.
+   */
+  void earlyTerminate();
+
   interface OnNewData {
     void newDataAvailable();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index 4605dc8d4d..387a44e754 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -71,6 +71,12 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
     cancelRemainingMailboxes();
   }
 
+  public void earlyTerminate() {
+    for (AsyncStream<E> mailbox : _mailboxes) {
+      mailbox.earlyTerminate();
+    }
+  }
+
   protected void cancelRemainingMailboxes() {
     for (AsyncStream<E> mailbox : _mailboxes) {
       mailbox.cancel();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 11616a68f0..8e6b563ac4 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -292,6 +292,31 @@ public class MailboxServiceTest {
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
   }
 
+  @Test
+  public void testLocalEarlyTerminated()
+      throws Exception {
+    String mailboxId = MailboxIdUtils.toMailboxId(_requestId++, 
SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0);
+    SendingMailbox sendingMailbox =
+        _mailboxService1.getSendingMailbox("localhost", 
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
+    ReceivingMailbox receivingMailbox = 
_mailboxService1.getReceivingMailbox(mailboxId);
+    receivingMailbox.registeredReader(() -> { });
+
+    // send a block
+    sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+    // receiving-side early terminates after pulling the first block
+    TransferableBlock block = receivingMailbox.poll();
+    receivingMailbox.earlyTerminate();
+    assertNotNull(block);
+    assertEquals(block.getNumRows(), 1);
+    // send another block b/c it doesn't guarantee the next block must be EOS
+    sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+    // send a metadata block
+    
sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // sending side should early terminate
+    assertTrue(sendingMailbox.isEarlyTerminated());
+  }
+
   @Test
   public void testRemoteHappyPathSendFirst()
       throws Exception {
@@ -556,4 +581,34 @@ public class MailboxServiceTest {
     assertEquals(numCallbacks.get(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1);
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
   }
+
+  @Test
+  public void testRemoteEarlyTerminated()
+      throws Exception {
+    String mailboxId = MailboxIdUtils.toMailboxId(_requestId++, 
SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0);
+
+    // Sends are non-blocking as long as channel capacity is not breached
+    SendingMailbox sendingMailbox =
+        _mailboxService2.getSendingMailbox("localhost", 
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
+    ReceivingMailbox receivingMailbox = 
_mailboxService1.getReceivingMailbox(mailboxId);
+    receivingMailbox.registeredReader(() -> { });
+
+    // send a block
+    sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+    // receiving-side early terminates after pulling the first block
+    TestUtils.waitForCondition(aVoid -> {
+          TransferableBlock block = receivingMailbox.poll();
+          return block != null && block.getNumRows() == 1;
+        }, 1000L, "Failed to deliver mails");
+    receivingMailbox.earlyTerminate();
+
+    // send another block b/c it doesn't guarantee the next block must be EOS
+    sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
+    // send a metadata block
+    
sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    sendingMailbox.complete();
+
+    // sending side should early terminate
+    TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(), 
1000L, "Failed to early-terminate sender");
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 9abd2e78b5..88d9cf4fe0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -646,6 +646,7 @@ public class HashJoinOperatorTest {
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
+    Mockito.verify(_rightOperator).earlyTerminate();
     Assert.assertFalse(result.isErrorBlock());
     Assert.assertEquals(result.getNumRows(), 1);
     
Assert.assertTrue(result.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 6e3fa310a9..9f5b8dfffe 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -44,6 +44,7 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -236,4 +237,32 @@ public class MailboxReceiveOperatorTest {
       
assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
     }
   }
+
+  @Test
+  public void shouldEarlyTerminateMailboxesWhenIndicated() {
+    Object[] row1 = new Object[]{1, 1};
+    Object[] row2 = new Object[]{2, 2};
+    Object[] row3 = new Object[]{3, 3};
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
+    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, 
row1),
+        OperatorTestUtil.block(DATA_SCHEMA, row3), 
TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2);
+    when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, 
row2),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    OpChainExecutionContext context =
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, _stageMetadataBoth);
+    try (MailboxReceiveOperator receiveOp = new 
MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
+        1)) {
+      // Receive first block from server1
+      assertEquals(receiveOp.nextBlock().getContainer().get(0), row1);
+      // at this point operator received a signal to early terminate
+      receiveOp.earlyTerminate();
+      // Receive next block should be EOS even if upstream keep sending normal 
block.
+      assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+      // Assure that early terminate signal goes into each mailbox
+      verify(_mailbox1).earlyTerminate();
+      verify(_mailbox2).earlyTerminate();
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index c01447d7cd..7a49dcf16a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -39,10 +39,7 @@ import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.mockito.MockitoAnnotations.openMocks;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
@@ -185,6 +182,22 @@ public class MailboxSendOperatorTest {
     
assertTrue(resultMetadata.containsKey(mailboxSendOperator.getOperatorId()));
   }
 
+  @Test
+  public void shouldEarlyTerminateWhenUpstreamWhenIndicated()
+      throws Exception {
+    // Given:
+    TransferableBlock dataBlock =
+        OperatorTestUtil.block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
+    when(_sourceOperator.nextBlock()).thenReturn(dataBlock);
+    doReturn(true).when(_exchange).send(any());
+
+    // When:
+    TransferableBlock block = getMailboxSendOperator().nextBlock();
+
+    // Then:
+    verify(_sourceOperator).earlyTerminate();
+  }
+
   private MailboxSendOperator getMailboxSendOperator() {
     StageMetadata stageMetadata = new 
StageMetadata.Builder().setWorkerMetadataList(
         Collections.singletonList(new 
WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index db6d18ee3c..fb937e6238 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -375,6 +375,11 @@ public class OpChainTest {
       _sleepTimeInMillis = sleepTimeInMillis;
     }
 
+    @Override
+    public List<MultiStageOperator> getChildOperators() {
+      return Collections.singletonList(_upstream);
+    }
+
     @Override
     protected TransferableBlock getNextBlock() {
       try {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index c58b394d19..03fc1755ba 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
@@ -618,6 +619,32 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to 
propagate");
   }
 
+  @Test
+  public void shouldEarlyTerminateCorrectlyWithSignalingPropagateUpstream() {
+    // Given:
+    List<RexExpression> collation = Collections.emptyList();
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, 
collation, directions, nullDirections, 10, 0,
+            schema, false);
+
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, 
new Object[]{2}, new Object[]{3},
+        new Object[]{4}, new Object[]{5}, new Object[]{6}, new Object[]{7}, 
new Object[]{8}, new Object[]{9},
+        new Object[]{10}, new Object[]{11}, new Object[]{12}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Mockito.verify(_input).earlyTerminate();
+    Assert.assertEquals(block.getNumRows(), 10);
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to 
propagate");
+  }
+
   private static List<RexExpression> collation(int... indexes) {
     return 
Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index db7e8cfc55..752d8ea4b3 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -37,6 +37,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.when;
+
 
 public class BlockExchangeTest {
   private AutoCloseable _mocks;
@@ -99,6 +101,37 @@ public class BlockExchangeTest {
     Mockito.verify(_mailbox2, Mockito.never()).send(Mockito.any());
   }
 
+  @Test
+  public void shouldSignalEarlyTerminationProperly()
+      throws Exception {
+    // Given:
+    List<SendingMailbox> destinations = ImmutableList.of(_mailbox1, _mailbox2);
+    BlockExchange exchange = new TestBlockExchange(destinations);
+    TransferableBlock block = new TransferableBlock(ImmutableList.of(new 
Object[]{"val"}),
+        new DataSchema(new String[]{"foo"}, new 
ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW);
+
+    // When send normal block and some mailbox has terminated
+    when(_mailbox1.isEarlyTerminated()).thenReturn(true);
+    boolean isEarlyTerminated = exchange.send(block);
+
+    // Then:
+    Assert.assertFalse(isEarlyTerminated);
+
+    // When send normal block and both terminated
+    when(_mailbox2.isTerminated()).thenReturn(true);
+    isEarlyTerminated = exchange.send(block);
+
+    // Then:
+    Assert.assertFalse(isEarlyTerminated);
+
+    // When send metadata block
+    when(_mailbox2.isEarlyTerminated()).thenReturn(true);
+    isEarlyTerminated = exchange.send(block);
+
+    // Then:
+    Assert.assertTrue(isEarlyTerminated);
+  }
+
   @Test
   public void shouldSplitBlocks()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to