This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7a3dbf43 Prevent multi-stage operator from returning empty 
TransferableBlock (#13591)
eb7a3dbf43 is described below

commit eb7a3dbf43c5f17356ac763fa65c02cb549145b9
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Jul 15 14:49:36 2024 -0700

    Prevent multi-stage operator from returning empty TransferableBlock (#13591)
---
 .../query/runtime/blocks/TransferableBlock.java    |   3 +
 .../query/runtime/operator/FilterOperator.java     |  27 +++--
 .../query/runtime/operator/HashJoinOperator.java   | 121 +++++++++++----------
 .../runtime/operator/LiteralValueOperator.java     |  16 +--
 .../runtime/operator/MailboxReceiveOperator.java   |   2 +-
 .../query/runtime/operator/MultiStageOperator.java |   5 +
 .../pinot/query/runtime/operator/SetOperator.java  |  49 +++++----
 .../operator/SortedMailboxReceiveOperator.java     |   2 +-
 .../runtime/operator/WindowAggregateOperator.java  |   4 +-
 .../runtime/operator/AggregateOperatorTest.java    |   8 +-
 .../query/runtime/operator/FilterOperatorTest.java |  24 ++--
 .../runtime/operator/HashJoinOperatorTest.java     |   3 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |  18 +--
 .../operator/WindowAggregateOperatorTest.java      |  16 +--
 14 files changed, 159 insertions(+), 139 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index f42fed4ecc..527510937d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.core.common.datablock.DataBlockBuilder;
 import org.apache.pinot.core.util.DataBlockExtractUtils;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 
+
 /**
  * A {@code TransferableBlock} is a wrapper around {@link DataBlock} for 
transferring data using
  * {@link org.apache.pinot.common.proto.Mailbox}.
@@ -61,6 +62,8 @@ public class TransferableBlock implements Block {
         "Container cannot be used to construct block of type: %s", type);
     _type = type;
     _numRows = _container.size();
+    // NOTE: Use assert to avoid breaking production code.
+    assert _numRows > 0 : "Container should not be empty";
     _errCodeToExceptionMap = new HashMap<>();
     _queryStats = null;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index e8a9a7e025..fb764c2201 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -95,21 +95,28 @@ public class FilterOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    TransferableBlock block = _input.nextBlock();
-    if (block.isEndOfStreamBlock()) {
+    // Keep reading the input blocks until we find a match row or all blocks 
are processed.
+    // TODO: Consider batching the rows to improve performance.
+    while (true) {
+      TransferableBlock block = _input.nextBlock();
       if (block.isErrorBlock()) {
         return block;
       }
-      return updateEosBlock(block, _statMap);
-    }
-    List<Object[]> resultRows = new ArrayList<>();
-    for (Object[] row : block.getContainer()) {
-      Object filterResult = _filterOperand.apply(row);
-      if (BooleanUtils.isTrueInternalValue(filterResult)) {
-        resultRows.add(row);
+      if (block.isSuccessfulEndOfStreamBlock()) {
+        return updateEosBlock(block, _statMap);
+      }
+      assert block.isDataBlock();
+      List<Object[]> rows = new ArrayList<>();
+      for (Object[] row : block.getContainer()) {
+        Object filterResult = _filterOperand.apply(row);
+        if (BooleanUtils.isTrueInternalValue(filterResult)) {
+          rows.add(row);
+        }
+      }
+      if (!rows.isEmpty()) {
+        return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
       }
     }
-    return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
   }
 
   public enum StatKey implements StatMap.Key {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index f224a69aa5..dd60ff523f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -51,6 +51,7 @@ import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOver
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This basic {@code BroadcastJoinOperator} implement a basic broadcast join 
algorithm.
  * This algorithm assumes that the broadcast table has to fit in memory since 
we are not supporting any spilling.
@@ -85,21 +86,14 @@ public class HashJoinOperator extends MultiStageOperator {
   private final MultiStageOperator _leftInput;
   private final MultiStageOperator _rightInput;
   private final JoinRelType _joinType;
+  private final KeySelector<?> _leftKeySelector;
+  private final KeySelector<?> _rightKeySelector;
   private final DataSchema _resultSchema;
   private final int _leftColumnSize;
   private final int _resultColumnSize;
   private final List<TransformOperand> _nonEquiEvaluators;
-  private boolean _isHashTableBuilt;
   private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
-  // Used by non-inner join.
-  // Needed to indicate we have finished processing all results after 
returning last block.
-  // TODO: Remove this special handling by fixing data block EOS abstraction 
or operator's invariant.
-  private boolean _isTerminated;
-  private TransferableBlock _upstreamErrorBlock;
-  private final KeySelector<?> _leftKeySelector;
-  private final KeySelector<?> _rightKeySelector;
-
   // Below are specific parameters to protect the hash table from growing too 
large.
   // Once the hash table reaches the limit, we will throw exception or break 
the right table build process.
   /**
@@ -113,9 +107,14 @@ public class HashJoinOperator extends MultiStageOperator {
    */
   private final JoinOverFlowMode _joinOverflowMode;
 
+  private boolean _isHashTableBuilt;
   private int _currentRowsInHashTable;
+  private TransferableBlock _upstreamErrorBlock;
   private MultiStageQueryStats _leftSideStats;
   private MultiStageQueryStats _rightSideStats;
+  // Used by non-inner join.
+  // Needed to indicate we have finished processing all results after 
returning last block.
+  private boolean _isTerminated;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftInput, DataSchema leftSchema,
       MultiStageOperator rightInput, JoinNode node) {
@@ -138,7 +137,6 @@ public class HashJoinOperator extends MultiStageOperator {
     for (RexExpression nonEquiCondition : nonEquiConditions) {
       
_nonEquiEvaluators.add(TransformOperandFactory.getTransformOperand(nonEquiCondition,
 _resultSchema));
     }
-    _isHashTableBuilt = false;
     _broadcastRightTable = new HashMap<>();
     if (needUnmatchedRightRows()) {
       _matchedRightRows = new HashMap<>();
@@ -209,10 +207,6 @@ public class HashJoinOperator extends MultiStageOperator {
   @Override
   protected TransferableBlock getNextBlock()
       throws ProcessingException {
-    if (_isTerminated) {
-      assert _leftSideStats != null;
-      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
-    }
     if (!_isHashTableBuilt) {
       // Build JOIN hash table
       buildBroadcastHashTable();
@@ -220,9 +214,7 @@ public class HashJoinOperator extends MultiStageOperator {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
     }
-    TransferableBlock leftBlock = _leftInput.nextBlock();
-    // JOIN each left block with the constructed right hash table.
-    return buildJoinedDataBlock(leftBlock);
+    return buildJoinedDataBlock();
   }
 
   private void buildBroadcastHashTable()
@@ -280,58 +272,51 @@ public class HashJoinOperator extends MultiStageOperator {
     _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, 
System.currentTimeMillis() - startTime);
   }
 
-  private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) {
-    if (leftBlock.isErrorBlock()) {
-      _upstreamErrorBlock = leftBlock;
-      return _upstreamErrorBlock;
-    }
-    if (leftBlock.isSuccessfulEndOfStreamBlock()) {
-      assert _rightSideStats != null;
-      _leftSideStats = leftBlock.getQueryStats();
+  private TransferableBlock buildJoinedDataBlock() {
+    if (_isTerminated) {
       assert _leftSideStats != null;
-      _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), 
_statMap);
+      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+    }
 
-      if (!needUnmatchedRightRows()) {
-        return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+    // Keep reading the input blocks until we find a match row or all blocks 
are processed.
+    // TODO: Consider batching the rows to improve performance.
+    while (true) {
+      TransferableBlock leftBlock = _leftInput.nextBlock();
+      if (leftBlock.isErrorBlock()) {
+        return leftBlock;
       }
-      // TODO: Moved to a different function.
-      // Return remaining non-matched rows for non-inner join.
-      List<Object[]> returnRows = new ArrayList<>();
-      for (Map.Entry<Object, ArrayList<Object[]>> entry : 
_broadcastRightTable.entrySet()) {
-        List<Object[]> rightRows = entry.getValue();
-        BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
-        if (matchedIndices == null) {
-          for (Object[] rightRow : rightRows) {
-            returnRows.add(joinRow(null, rightRow));
-          }
-        } else {
-          int numRightRows = rightRows.size();
-          int unmatchedIndex = 0;
-          while ((unmatchedIndex = 
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
-            returnRows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+      if (leftBlock.isSuccessfulEndOfStreamBlock()) {
+        assert _rightSideStats != null;
+        _leftSideStats = leftBlock.getQueryStats();
+        assert _leftSideStats != null;
+        _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), 
_statMap);
+        if (needUnmatchedRightRows()) {
+          List<Object[]> rows = buildNonMatchRightRows();
+          if (!rows.isEmpty()) {
+            _isTerminated = true;
+            return new TransferableBlock(rows, _resultSchema, 
DataBlock.Type.ROW);
           }
         }
+        return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+      }
+      assert leftBlock.isDataBlock();
+      List<Object[]> rows = buildJoinedRows(leftBlock);
+      if (!rows.isEmpty()) {
+        return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
       }
-      _isTerminated = true;
-      return new TransferableBlock(returnRows, _resultSchema, 
DataBlock.Type.ROW);
     }
-    List<Object[]> rows;
+  }
+
+  private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) {
     switch (_joinType) {
-      case SEMI: {
-        rows = buildJoinedDataBlockSemi(leftBlock);
-        break;
-      }
-      case ANTI: {
-        rows = buildJoinedDataBlockAnti(leftBlock);
-        break;
-      }
+      case SEMI:
+        return buildJoinedDataBlockSemi(leftBlock);
+      case ANTI:
+        return buildJoinedDataBlockAnti(leftBlock);
       default: { // INNER, LEFT, RIGHT, FULL
-        rows = buildJoinedDataBlockDefault(leftBlock);
-        break;
+        return buildJoinedDataBlockDefault(leftBlock);
       }
     }
-    // TODO: Rows can be empty here. Consider fetching another left block 
instead of returning empty block.
-    return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
   private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) 
{
@@ -401,6 +386,26 @@ public class HashJoinOperator extends MultiStageOperator {
     return rows;
   }
 
+  private List<Object[]> buildNonMatchRightRows() {
+    List<Object[]> rows = new ArrayList<>();
+    for (Map.Entry<Object, ArrayList<Object[]>> entry : 
_broadcastRightTable.entrySet()) {
+      List<Object[]> rightRows = entry.getValue();
+      BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
+      if (matchedIndices == null) {
+        for (Object[] rightRow : rightRows) {
+          rows.add(joinRow(null, rightRow));
+        }
+      } else {
+        int numRightRows = rightRows.size();
+        int unmatchedIndex = 0;
+        while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex)) 
< numRightRows) {
+          rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+        }
+      }
+    }
+    return rows;
+  }
+
   private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] 
rightRow) {
     Object[] resultRow = new Object[_resultColumnSize];
     int idx = 0;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index e8824ca5e3..fc9b5b5169 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -38,15 +38,15 @@ public class LiteralValueOperator extends 
MultiStageOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LiteralValueOperator.class);
 
   private final DataSchema _dataSchema;
-  private final TransferableBlock _rexLiteralBlock;
+  private final List<List<RexExpression.Literal>> _literalRows;
   private boolean _isLiteralBlockReturned;
   private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
   public LiteralValueOperator(OpChainExecutionContext context, ValueNode node) 
{
     super(context);
     _dataSchema = node.getDataSchema();
-    _rexLiteralBlock = constructBlock(node.getLiteralRows());
-    // only return a single literal block when it is the 1st virtual server. 
otherwise, result will be duplicated.
+    _literalRows = node.getLiteralRows();
+    // Only return a single literal block when it is the 1st virtual server. 
Otherwise, result will be duplicated.
     _isLiteralBlockReturned = context.getId().getVirtualServerId() != 0;
   }
 
@@ -73,9 +73,9 @@ public class LiteralValueOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (!_isLiteralBlockReturned && !_isEarlyTerminated) {
+    if (!_isLiteralBlockReturned && !_isEarlyTerminated && 
!_literalRows.isEmpty()) {
       _isLiteralBlockReturned = true;
-      return _rexLiteralBlock;
+      return constructBlock();
     } else {
       return createEosBlock();
     }
@@ -91,9 +91,9 @@ public class LiteralValueOperator extends MultiStageOperator {
     return Type.LITERAL;
   }
 
-  private TransferableBlock constructBlock(List<List<RexExpression.Literal>> 
literalRows) {
-    List<Object[]> blockContent = new ArrayList<>();
-    for (List<RexExpression.Literal> row : literalRows) {
+  private TransferableBlock constructBlock() {
+    List<Object[]> blockContent = new ArrayList<>(_literalRows.size());
+    for (List<RexExpression.Literal> row : _literalRows) {
       Object[] values = new Object[_dataSchema.size()];
       for (int i = 0; i < _dataSchema.size(); i++) {
         values[i] = row.get(i).getValue();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index e804b4b408..c1c7647a1e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This {@code MailboxReceiveOperator} receives data from a {@link 
ReceivingMailbox} and serve it out from the
- * {@link MultiStageOperator#getNextBlock()} API.
+ * {@link #nextBlock()} API.
  */
 public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxReceiveOperator.class);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 50e68a47a6..0321bedc1b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -66,6 +66,11 @@ public abstract class MultiStageOperator
 
   public abstract void registerExecution(long time, int numRows);
 
+  /**
+   * Returns the next block from the operator. It should return non-empty data 
blocks followed by an end-of-stream (EOS)
+   * block when all the data is processed, or an error block if an error 
occurred. After it returns EOS or error block,
+   * no more call should be made.
+   */
   @Override
   public TransferableBlock nextBlock() {
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index fb8ef505e5..ea5cf046d7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -108,9 +108,7 @@ public abstract class SetOperator extends 
MultiStageOperator {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
     }
-    // UNION each left block with the constructed right block set.
-    TransferableBlock leftBlock = _leftChildOperator.nextBlock();
-    return constructResultBlockSet(leftBlock);
+    return constructResultBlockSet();
   }
 
   protected void constructRightBlockSet() {
@@ -132,28 +130,33 @@ public abstract class SetOperator extends 
MultiStageOperator {
     }
   }
 
-  protected TransferableBlock constructResultBlockSet(TransferableBlock 
leftBlock) {
-    List<Object[]> rows = new ArrayList<>();
-    // TODO: Other operators keep the first erroneous block, while this keep 
the last.
-    //  We should decide what is what we want to do and be consistent with 
that.
-    if (_upstreamErrorBlock != null || leftBlock.isErrorBlock()) {
-      _upstreamErrorBlock = leftBlock;
-      return _upstreamErrorBlock;
-    }
-    if (leftBlock.isSuccessfulEndOfStreamBlock()) {
-      assert _rightQueryStats != null;
-      MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats();
-      assert leftQueryStats != null;
-      _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(), 
_statMap);
-      
_rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats());
-      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats);
-    }
-    for (Object[] row : leftBlock.getContainer()) {
-      if (handleRowMatched(row)) {
-        rows.add(row);
+  protected TransferableBlock constructResultBlockSet() {
+    // Keep reading the input blocks until we find a match row or all blocks 
are processed.
+    // TODO: Consider batching the rows to improve performance.
+    while (true) {
+      TransferableBlock leftBlock = _leftChildOperator.nextBlock();
+      if (leftBlock.isErrorBlock()) {
+        return leftBlock;
+      }
+      if (leftBlock.isSuccessfulEndOfStreamBlock()) {
+        assert _rightQueryStats != null;
+        MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats();
+        assert leftQueryStats != null;
+        _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(), 
_statMap);
+        
_rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats());
+        return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats);
+      }
+      assert leftBlock.isDataBlock();
+      List<Object[]> rows = new ArrayList<>();
+      for (Object[] row : leftBlock.getContainer()) {
+        if (handleRowMatched(row)) {
+          rows.add(row);
+        }
+      }
+      if (!rows.isEmpty()) {
+        return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
       }
     }
-    return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
   }
 
   /**
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 95e6cf2d4f..35e0ac8503 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This {@code SortedMailboxReceiveOperator} receives data from a {@link 
ReceivingMailbox} and serve it out from the
- * {@link MultiStageOperator#getNextBlock()}()} API in a sorted manner.
+ * {@link #nextBlock()} API in a sorted manner.
  *
  *  TODO: Once sorting on the {@code MailboxSendOperator} is available, modify 
this to use a k-way merge instead of
  *        resorting via the PriorityQueue.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 4272873a79..27001778d4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -40,7 +40,6 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
 import org.apache.pinot.query.runtime.operator.window.WindowFunction;
@@ -244,7 +243,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   private TransferableBlock computeBlocks()
       throws ProcessingException {
     TransferableBlock block = _input.nextBlock();
-    while (!TransferableBlockUtils.isEndOfStream(block)) {
+    while (block.isDataBlock()) {
       List<Object[]> container = block.getContainer();
       int containerSize = container.size();
       if (_numRows + containerSize > _maxRowsInWindowCache) {
@@ -276,6 +275,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     if (block.isErrorBlock()) {
       return block;
     }
+    assert block.isSuccessfulEndOfStreamBlock();
     _eosBlock = updateEosBlock(block, _statMap);
 
     ColumnDataType[] resultStoredTypes = 
_resultSchema.getStoredColumnDataTypes();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 03c5cc0e95..1c7edcae6c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -118,7 +118,7 @@ public class AggregateOperatorTest {
     AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     assertEquals(resultRows.size(), 1);
@@ -141,7 +141,7 @@ public class AggregateOperatorTest {
     AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     assertEquals(resultRows.size(), 1);
@@ -168,7 +168,7 @@ public class AggregateOperatorTest {
     AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     assertEquals(resultRows.size(), 1);
@@ -188,7 +188,7 @@ public class AggregateOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new 
ColumnDataType[]{STRING, DOUBLE});
     AggregateOperator operator = getOperator(resultSchema, aggCalls, 
filterArgs, groupKeys);
 
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 2);
     if (resultRows.get(0)[0].equals("Aa")) {
       assertEquals(resultRows.get(0), new Object[]{"Aa", 1.0});
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index 462cd2ddbf..1fb29c79f0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -63,7 +63,7 @@ public class FilterOperatorTest {
         ColumnDataType.BOOLEAN
     });
     FilterOperator operator = getOperator(inputSchema, 
RexExpression.Literal.TRUE);
-    TransferableBlock block = operator.getNextBlock();
+    TransferableBlock block = operator.nextBlock();
     assertTrue(block.isErrorBlock());
     
assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
   }
@@ -75,7 +75,7 @@ public class FilterOperatorTest {
     });
     
when(_input.nextBlock()).thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
     FilterOperator operator = getOperator(inputSchema, 
RexExpression.Literal.TRUE);
-    TransferableBlock block = operator.getNextBlock();
+    TransferableBlock block = operator.nextBlock();
     assertTrue(block.isEndOfStreamBlock());
   }
 
@@ -87,7 +87,7 @@ public class FilterOperatorTest {
     when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{0}, new Object[]{1}))
         
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
     FilterOperator operator = getOperator(inputSchema, 
RexExpression.Literal.TRUE);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 2);
     assertEquals(resultRows.get(0), new Object[]{0});
     assertEquals(resultRows.get(1), new Object[]{1});
@@ -98,10 +98,10 @@ public class FilterOperatorTest {
     DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
ColumnDataType[]{
         ColumnDataType.INT
     });
-    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{1}, new Object[]{2}));
+    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{1}, new Object[]{2}))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
     FilterOperator operator = getOperator(inputSchema, 
RexExpression.Literal.FALSE);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
-    assertTrue(resultRows.isEmpty());
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = "Filter operand must "
@@ -134,7 +134,7 @@ public class FilterOperatorTest {
     });
     when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{1, 1}, new Object[]{2, 0}));
     FilterOperator operator = getOperator(inputSchema, ref1);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0), new Object[]{1, 1});
   }
@@ -149,7 +149,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall andCall = new 
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.AND.name(),
         List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
     FilterOperator operator = getOperator(inputSchema, andCall);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0), new Object[]{1, 1});
   }
@@ -164,7 +164,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall orCall = new 
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.OR.name(),
         List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
     FilterOperator operator = getOperator(inputSchema, orCall);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 2);
     assertEquals(resultRows.get(0), new Object[]{1, 1});
     assertEquals(resultRows.get(1), new Object[]{1, 0});
@@ -180,7 +180,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall notCall = new 
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.NOT.name(),
         List.of(new RexExpression.InputRef(0)));
     FilterOperator operator = getOperator(inputSchema, notCall);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0)[0], 0);
     assertEquals(resultRows.get(0)[1], 0);
@@ -197,7 +197,7 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, 
SqlKind.GREATER_THAN.name(),
             List.of(new RexExpression.InputRef(0), new 
RexExpression.InputRef(1)));
     FilterOperator operator = getOperator(inputSchema, greaterThan);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0), new Object[]{3, 2});
   }
@@ -213,7 +213,7 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(ColumnDataType.BOOLEAN, 
SqlKind.STARTS_WITH.name(),
             List.of(new RexExpression.InputRef(0), new 
RexExpression.Literal(ColumnDataType.STRING, "star")));
     FilterOperator operator = getOperator(inputSchema, startsWith);
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
     assertEquals(resultRows.size(), 1);
     assertEquals(resultRows.get(0), new Object[]{"starTree"});
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index e03a2f86f8..f204c04c98 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -242,8 +242,7 @@ public class HashJoinOperatorTest {
         });
     HashJoinOperator operator =
         getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(0), 
List.of(0), List.of());
-    List<Object[]> resultRows = operator.nextBlock().getContainer();
-    assertTrue(resultRows.isEmpty());
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
   }
 
   @Test
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 56108f8af4..cc92873c94 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
@@ -104,8 +105,7 @@ public class MailboxSendOperatorTest {
   public void shouldNotSendErrorBlockWhenTimedOut()
       throws Exception {
     // Given:
-    TransferableBlock dataBlock =
-        OperatorTestUtil.block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
+    TransferableBlock dataBlock = getDummyDataBlock();
     when(_input.nextBlock()).thenReturn(dataBlock);
     doThrow(new TimeoutException()).when(_exchange).send(any());
 
@@ -141,10 +141,8 @@ public class MailboxSendOperatorTest {
   public void shouldSendDataBlock()
       throws Exception {
     // Given:
-    TransferableBlock dataBlock1 =
-        OperatorTestUtil.block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
-    TransferableBlock dataBlock2 =
-        OperatorTestUtil.block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
+    TransferableBlock dataBlock1 = getDummyDataBlock();
+    TransferableBlock dataBlock2 = getDummyDataBlock();
     TransferableBlock eosBlock =
         
TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.emptyStats(SENDER_STAGE_ID));
     when(_input.nextBlock()).thenReturn(dataBlock1, dataBlock2, eosBlock);
@@ -183,8 +181,7 @@ public class MailboxSendOperatorTest {
   public void shouldEarlyTerminateWhenUpstreamWhenIndicated()
       throws Exception {
     // Given:
-    TransferableBlock dataBlock =
-        OperatorTestUtil.block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
+    TransferableBlock dataBlock = getDummyDataBlock();
     when(_input.nextBlock()).thenReturn(dataBlock);
     doReturn(true).when(_exchange).send(any());
 
@@ -203,4 +200,9 @@ public class MailboxSendOperatorTest {
             null);
     return new MailboxSendOperator(context, _input, statMap -> _exchange);
   }
+
+  private static TransferableBlock getDummyDataBlock() {
+    return OperatorTestUtil.block(new DataSchema(new String[]{"intCol"}, new 
ColumnDataType[]{ColumnDataType.INT}),
+        new Object[]{1});
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index cd3ca26b6c..ec5d667254 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.data.table.Key;
@@ -270,8 +269,7 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testRankDenseRankRankingFunctions()
-      throws ProcessingException {
+  public void testRankDenseRankRankingFunctions() {
     // Given:
     DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
     // Input should be in sorted order on the order by key as SortExchange 
will handle pre-sorting the data
@@ -294,7 +292,7 @@ public class WindowAggregateOperatorTest {
             Integer.MIN_VALUE, 0);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     verifyResultRows(resultRows, keys,
@@ -308,8 +306,7 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testRowNumberRankingFunction()
-      throws ProcessingException {
+  public void testRowNumberRankingFunction() {
     // Given:
     DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
     // Input should be in sorted order on the order by key as SortExchange 
will handle pre-sorting the data
@@ -330,7 +327,7 @@ public class WindowAggregateOperatorTest {
             Integer.MIN_VALUE, 0);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     verifyResultRows(resultRows, keys, Map.of(1, List.<Object[]>of(new 
Object[]{1, "foo", 1L}), 2,
@@ -340,8 +337,7 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test
-  public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys()
-      throws ProcessingException {
+  public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
     // Given:
     DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
     // Input should be in sorted order on the order by key as SortExchange 
will handle pre-sorting the data
@@ -360,7 +356,7 @@ public class WindowAggregateOperatorTest {
             Integer.MIN_VALUE, Integer.MAX_VALUE);
 
     // When:
-    List<Object[]> resultRows = operator.getNextBlock().getContainer();
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
     verifyResultRows(resultRows, keys, Map.of(1, List.<Object[]>of(new 
Object[]{1, "foo", 1.0}), 2,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to