This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eb7a3dbf43 Prevent multi-stage operator from returning empty
TransferableBlock (#13591)
eb7a3dbf43 is described below
commit eb7a3dbf43c5f17356ac763fa65c02cb549145b9
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Jul 15 14:49:36 2024 -0700
Prevent multi-stage operator from returning empty TransferableBlock (#13591)
---
.../query/runtime/blocks/TransferableBlock.java | 3 +
.../query/runtime/operator/FilterOperator.java | 27 +++--
.../query/runtime/operator/HashJoinOperator.java | 121 +++++++++++----------
.../runtime/operator/LiteralValueOperator.java | 16 +--
.../runtime/operator/MailboxReceiveOperator.java | 2 +-
.../query/runtime/operator/MultiStageOperator.java | 5 +
.../pinot/query/runtime/operator/SetOperator.java | 49 +++++----
.../operator/SortedMailboxReceiveOperator.java | 2 +-
.../runtime/operator/WindowAggregateOperator.java | 4 +-
.../runtime/operator/AggregateOperatorTest.java | 8 +-
.../query/runtime/operator/FilterOperatorTest.java | 24 ++--
.../runtime/operator/HashJoinOperatorTest.java | 3 +-
.../runtime/operator/MailboxSendOperatorTest.java | 18 +--
.../operator/WindowAggregateOperatorTest.java | 16 +--
14 files changed, 159 insertions(+), 139 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index f42fed4ecc..527510937d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+
/**
* A {@code TransferableBlock} is a wrapper around {@link DataBlock} for
transferring data using
* {@link org.apache.pinot.common.proto.Mailbox}.
@@ -61,6 +62,8 @@ public class TransferableBlock implements Block {
"Container cannot be used to construct block of type: %s", type);
_type = type;
_numRows = _container.size();
+ // NOTE: Use assert to avoid breaking production code.
+ assert _numRows > 0 : "Container should not be empty";
_errCodeToExceptionMap = new HashMap<>();
_queryStats = null;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index e8a9a7e025..fb764c2201 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -95,21 +95,28 @@ public class FilterOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- TransferableBlock block = _input.nextBlock();
- if (block.isEndOfStreamBlock()) {
+ // Keep reading the input blocks until we find a match row or all blocks
are processed.
+ // TODO: Consider batching the rows to improve performance.
+ while (true) {
+ TransferableBlock block = _input.nextBlock();
if (block.isErrorBlock()) {
return block;
}
- return updateEosBlock(block, _statMap);
- }
- List<Object[]> resultRows = new ArrayList<>();
- for (Object[] row : block.getContainer()) {
- Object filterResult = _filterOperand.apply(row);
- if (BooleanUtils.isTrueInternalValue(filterResult)) {
- resultRows.add(row);
+ if (block.isSuccessfulEndOfStreamBlock()) {
+ return updateEosBlock(block, _statMap);
+ }
+ assert block.isDataBlock();
+ List<Object[]> rows = new ArrayList<>();
+ for (Object[] row : block.getContainer()) {
+ Object filterResult = _filterOperand.apply(row);
+ if (BooleanUtils.isTrueInternalValue(filterResult)) {
+ rows.add(row);
+ }
+ }
+ if (!rows.isEmpty()) {
+ return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
}
}
- return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
}
public enum StatKey implements StatMap.Key {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index f224a69aa5..dd60ff523f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -51,6 +51,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOver
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This basic {@code BroadcastJoinOperator} implement a basic broadcast join
algorithm.
* This algorithm assumes that the broadcast table has to fit in memory since
we are not supporting any spilling.
@@ -85,21 +86,14 @@ public class HashJoinOperator extends MultiStageOperator {
private final MultiStageOperator _leftInput;
private final MultiStageOperator _rightInput;
private final JoinRelType _joinType;
+ private final KeySelector<?> _leftKeySelector;
+ private final KeySelector<?> _rightKeySelector;
private final DataSchema _resultSchema;
private final int _leftColumnSize;
private final int _resultColumnSize;
private final List<TransformOperand> _nonEquiEvaluators;
- private boolean _isHashTableBuilt;
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
- // Used by non-inner join.
- // Needed to indicate we have finished processing all results after
returning last block.
- // TODO: Remove this special handling by fixing data block EOS abstraction
or operator's invariant.
- private boolean _isTerminated;
- private TransferableBlock _upstreamErrorBlock;
- private final KeySelector<?> _leftKeySelector;
- private final KeySelector<?> _rightKeySelector;
-
// Below are specific parameters to protect the hash table from growing too
large.
// Once the hash table reaches the limit, we will throw exception or break
the right table build process.
/**
@@ -113,9 +107,14 @@ public class HashJoinOperator extends MultiStageOperator {
*/
private final JoinOverFlowMode _joinOverflowMode;
+ private boolean _isHashTableBuilt;
private int _currentRowsInHashTable;
+ private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
+ // Used by non-inner join.
+ // Needed to indicate we have finished processing all results after
returning last block.
+ private boolean _isTerminated;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftInput, DataSchema leftSchema,
MultiStageOperator rightInput, JoinNode node) {
@@ -138,7 +137,6 @@ public class HashJoinOperator extends MultiStageOperator {
for (RexExpression nonEquiCondition : nonEquiConditions) {
_nonEquiEvaluators.add(TransformOperandFactory.getTransformOperand(nonEquiCondition,
_resultSchema));
}
- _isHashTableBuilt = false;
_broadcastRightTable = new HashMap<>();
if (needUnmatchedRightRows()) {
_matchedRightRows = new HashMap<>();
@@ -209,10 +207,6 @@ public class HashJoinOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock()
throws ProcessingException {
- if (_isTerminated) {
- assert _leftSideStats != null;
- return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
- }
if (!_isHashTableBuilt) {
// Build JOIN hash table
buildBroadcastHashTable();
@@ -220,9 +214,7 @@ public class HashJoinOperator extends MultiStageOperator {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
}
- TransferableBlock leftBlock = _leftInput.nextBlock();
- // JOIN each left block with the constructed right hash table.
- return buildJoinedDataBlock(leftBlock);
+ return buildJoinedDataBlock();
}
private void buildBroadcastHashTable()
@@ -280,58 +272,51 @@ public class HashJoinOperator extends MultiStageOperator {
_statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
}
- private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) {
- if (leftBlock.isErrorBlock()) {
- _upstreamErrorBlock = leftBlock;
- return _upstreamErrorBlock;
- }
- if (leftBlock.isSuccessfulEndOfStreamBlock()) {
- assert _rightSideStats != null;
- _leftSideStats = leftBlock.getQueryStats();
+ private TransferableBlock buildJoinedDataBlock() {
+ if (_isTerminated) {
assert _leftSideStats != null;
- _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(),
_statMap);
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+ }
- if (!needUnmatchedRightRows()) {
- return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+ // Keep reading the input blocks until we find a match row or all blocks
are processed.
+ // TODO: Consider batching the rows to improve performance.
+ while (true) {
+ TransferableBlock leftBlock = _leftInput.nextBlock();
+ if (leftBlock.isErrorBlock()) {
+ return leftBlock;
}
- // TODO: Moved to a different function.
- // Return remaining non-matched rows for non-inner join.
- List<Object[]> returnRows = new ArrayList<>();
- for (Map.Entry<Object, ArrayList<Object[]>> entry :
_broadcastRightTable.entrySet()) {
- List<Object[]> rightRows = entry.getValue();
- BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
- if (matchedIndices == null) {
- for (Object[] rightRow : rightRows) {
- returnRows.add(joinRow(null, rightRow));
- }
- } else {
- int numRightRows = rightRows.size();
- int unmatchedIndex = 0;
- while ((unmatchedIndex =
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
- returnRows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+ if (leftBlock.isSuccessfulEndOfStreamBlock()) {
+ assert _rightSideStats != null;
+ _leftSideStats = leftBlock.getQueryStats();
+ assert _leftSideStats != null;
+ _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(),
_statMap);
+ if (needUnmatchedRightRows()) {
+ List<Object[]> rows = buildNonMatchRightRows();
+ if (!rows.isEmpty()) {
+ _isTerminated = true;
+ return new TransferableBlock(rows, _resultSchema,
DataBlock.Type.ROW);
}
}
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+ }
+ assert leftBlock.isDataBlock();
+ List<Object[]> rows = buildJoinedRows(leftBlock);
+ if (!rows.isEmpty()) {
+ return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
- _isTerminated = true;
- return new TransferableBlock(returnRows, _resultSchema,
DataBlock.Type.ROW);
}
- List<Object[]> rows;
+ }
+
+ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) {
switch (_joinType) {
- case SEMI: {
- rows = buildJoinedDataBlockSemi(leftBlock);
- break;
- }
- case ANTI: {
- rows = buildJoinedDataBlockAnti(leftBlock);
- break;
- }
+ case SEMI:
+ return buildJoinedDataBlockSemi(leftBlock);
+ case ANTI:
+ return buildJoinedDataBlockAnti(leftBlock);
default: { // INNER, LEFT, RIGHT, FULL
- rows = buildJoinedDataBlockDefault(leftBlock);
- break;
+ return buildJoinedDataBlockDefault(leftBlock);
}
}
- // TODO: Rows can be empty here. Consider fetching another left block
instead of returning empty block.
- return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
@@ -401,6 +386,26 @@ public class HashJoinOperator extends MultiStageOperator {
return rows;
}
+ private List<Object[]> buildNonMatchRightRows() {
+ List<Object[]> rows = new ArrayList<>();
+ for (Map.Entry<Object, ArrayList<Object[]>> entry :
_broadcastRightTable.entrySet()) {
+ List<Object[]> rightRows = entry.getValue();
+ BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
+ if (matchedIndices == null) {
+ for (Object[] rightRow : rightRows) {
+ rows.add(joinRow(null, rightRow));
+ }
+ } else {
+ int numRightRows = rightRows.size();
+ int unmatchedIndex = 0;
+ while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex))
< numRightRows) {
+ rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+ }
+ }
+ }
+ return rows;
+ }
+
private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[]
rightRow) {
Object[] resultRow = new Object[_resultColumnSize];
int idx = 0;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index e8824ca5e3..fc9b5b5169 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -38,15 +38,15 @@ public class LiteralValueOperator extends
MultiStageOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(LiteralValueOperator.class);
private final DataSchema _dataSchema;
- private final TransferableBlock _rexLiteralBlock;
+ private final List<List<RexExpression.Literal>> _literalRows;
private boolean _isLiteralBlockReturned;
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
public LiteralValueOperator(OpChainExecutionContext context, ValueNode node)
{
super(context);
_dataSchema = node.getDataSchema();
- _rexLiteralBlock = constructBlock(node.getLiteralRows());
- // only return a single literal block when it is the 1st virtual server.
otherwise, result will be duplicated.
+ _literalRows = node.getLiteralRows();
+ // Only return a single literal block when it is the 1st virtual server.
Otherwise, result will be duplicated.
_isLiteralBlockReturned = context.getId().getVirtualServerId() != 0;
}
@@ -73,9 +73,9 @@ public class LiteralValueOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- if (!_isLiteralBlockReturned && !_isEarlyTerminated) {
+ if (!_isLiteralBlockReturned && !_isEarlyTerminated &&
!_literalRows.isEmpty()) {
_isLiteralBlockReturned = true;
- return _rexLiteralBlock;
+ return constructBlock();
} else {
return createEosBlock();
}
@@ -91,9 +91,9 @@ public class LiteralValueOperator extends MultiStageOperator {
return Type.LITERAL;
}
- private TransferableBlock constructBlock(List<List<RexExpression.Literal>>
literalRows) {
- List<Object[]> blockContent = new ArrayList<>();
- for (List<RexExpression.Literal> row : literalRows) {
+ private TransferableBlock constructBlock() {
+ List<Object[]> blockContent = new ArrayList<>(_literalRows.size());
+ for (List<RexExpression.Literal> row : _literalRows) {
Object[] values = new Object[_dataSchema.size()];
for (int i = 0; i < _dataSchema.size(); i++) {
values[i] = row.get(i).getValue();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index e804b4b408..c1c7647a1e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
/**
* This {@code MailboxReceiveOperator} receives data from a {@link
ReceivingMailbox} and serve it out from the
- * {@link MultiStageOperator#getNextBlock()} API.
+ * {@link #nextBlock()} API.
*/
public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxReceiveOperator.class);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 50e68a47a6..0321bedc1b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -66,6 +66,11 @@ public abstract class MultiStageOperator
public abstract void registerExecution(long time, int numRows);
+ /**
+ * Returns the next block from the operator. It should return non-empty data
blocks followed by an end-of-stream (EOS)
+ * block when all the data is processed, or an error block if an error
occurred. After it returns EOS or error block,
+ * no more call should be made.
+ */
@Override
public TransferableBlock nextBlock() {
if (Tracing.ThreadAccountantOps.isInterrupted()) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index fb8ef505e5..ea5cf046d7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -108,9 +108,7 @@ public abstract class SetOperator extends
MultiStageOperator {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
}
- // UNION each left block with the constructed right block set.
- TransferableBlock leftBlock = _leftChildOperator.nextBlock();
- return constructResultBlockSet(leftBlock);
+ return constructResultBlockSet();
}
protected void constructRightBlockSet() {
@@ -132,28 +130,33 @@ public abstract class SetOperator extends
MultiStageOperator {
}
}
- protected TransferableBlock constructResultBlockSet(TransferableBlock
leftBlock) {
- List<Object[]> rows = new ArrayList<>();
- // TODO: Other operators keep the first erroneous block, while this keep
the last.
- // We should decide what is what we want to do and be consistent with
that.
- if (_upstreamErrorBlock != null || leftBlock.isErrorBlock()) {
- _upstreamErrorBlock = leftBlock;
- return _upstreamErrorBlock;
- }
- if (leftBlock.isSuccessfulEndOfStreamBlock()) {
- assert _rightQueryStats != null;
- MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats();
- assert leftQueryStats != null;
- _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(),
_statMap);
-
_rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats());
- return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats);
- }
- for (Object[] row : leftBlock.getContainer()) {
- if (handleRowMatched(row)) {
- rows.add(row);
+ protected TransferableBlock constructResultBlockSet() {
+ // Keep reading the input blocks until we find a match row or all blocks
are processed.
+ // TODO: Consider batching the rows to improve performance.
+ while (true) {
+ TransferableBlock leftBlock = _leftChildOperator.nextBlock();
+ if (leftBlock.isErrorBlock()) {
+ return leftBlock;
+ }
+ if (leftBlock.isSuccessfulEndOfStreamBlock()) {
+ assert _rightQueryStats != null;
+ MultiStageQueryStats leftQueryStats = leftBlock.getQueryStats();
+ assert leftQueryStats != null;
+ _rightQueryStats.mergeInOrder(leftQueryStats, getOperatorType(),
_statMap);
+
_rightQueryStats.getCurrentStats().concat(leftQueryStats.getCurrentStats());
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_rightQueryStats);
+ }
+ assert leftBlock.isDataBlock();
+ List<Object[]> rows = new ArrayList<>();
+ for (Object[] row : leftBlock.getContainer()) {
+ if (handleRowMatched(row)) {
+ rows.add(row);
+ }
+ }
+ if (!rows.isEmpty()) {
+ return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
}
}
- return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
}
/**
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 95e6cf2d4f..35e0ac8503 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
* This {@code SortedMailboxReceiveOperator} receives data from a {@link
ReceivingMailbox} and serve it out from the
- * {@link MultiStageOperator#getNextBlock()}()} API in a sorted manner.
+ * {@link #nextBlock()} API in a sorted manner.
*
* TODO: Once sorting on the {@code MailboxSendOperator} is available, modify
this to use a k-way merge instead of
* resorting via the PriorityQueue.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 4272873a79..27001778d4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -40,7 +40,6 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
import org.apache.pinot.query.runtime.operator.window.WindowFunction;
@@ -244,7 +243,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
private TransferableBlock computeBlocks()
throws ProcessingException {
TransferableBlock block = _input.nextBlock();
- while (!TransferableBlockUtils.isEndOfStream(block)) {
+ while (block.isDataBlock()) {
List<Object[]> container = block.getContainer();
int containerSize = container.size();
if (_numRows + containerSize > _maxRowsInWindowCache) {
@@ -276,6 +275,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
if (block.isErrorBlock()) {
return block;
}
+ assert block.isSuccessfulEndOfStreamBlock();
_eosBlock = updateEosBlock(block, _statMap);
ColumnDataType[] resultStoredTypes =
_resultSchema.getStoredColumnDataTypes();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 03c5cc0e95..1c7edcae6c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -118,7 +118,7 @@ public class AggregateOperatorTest {
AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
assertEquals(resultRows.size(), 1);
@@ -141,7 +141,7 @@ public class AggregateOperatorTest {
AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
assertEquals(resultRows.size(), 1);
@@ -168,7 +168,7 @@ public class AggregateOperatorTest {
AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
assertEquals(resultRows.size(), 1);
@@ -188,7 +188,7 @@ public class AggregateOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new
ColumnDataType[]{STRING, DOUBLE});
AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 2);
if (resultRows.get(0)[0].equals("Aa")) {
assertEquals(resultRows.get(0), new Object[]{"Aa", 1.0});
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index 462cd2ddbf..1fb29c79f0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -63,7 +63,7 @@ public class FilterOperatorTest {
ColumnDataType.BOOLEAN
});
FilterOperator operator = getOperator(inputSchema,
RexExpression.Literal.TRUE);
- TransferableBlock block = operator.getNextBlock();
+ TransferableBlock block = operator.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
}
@@ -75,7 +75,7 @@ public class FilterOperatorTest {
});
when(_input.nextBlock()).thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
FilterOperator operator = getOperator(inputSchema,
RexExpression.Literal.TRUE);
- TransferableBlock block = operator.getNextBlock();
+ TransferableBlock block = operator.nextBlock();
assertTrue(block.isEndOfStreamBlock());
}
@@ -87,7 +87,7 @@ public class FilterOperatorTest {
when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
new Object[]{0}, new Object[]{1}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
FilterOperator operator = getOperator(inputSchema,
RexExpression.Literal.TRUE);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 2);
assertEquals(resultRows.get(0), new Object[]{0});
assertEquals(resultRows.get(1), new Object[]{1});
@@ -98,10 +98,10 @@ public class FilterOperatorTest {
DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
ColumnDataType[]{
ColumnDataType.INT
});
- when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
new Object[]{1}, new Object[]{2}));
+ when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
new Object[]{1}, new Object[]{2}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
FilterOperator operator = getOperator(inputSchema,
RexExpression.Literal.FALSE);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
- assertTrue(resultRows.isEmpty());
+ assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
}
@Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = "Filter operand must "
@@ -134,7 +134,7 @@ public class FilterOperatorTest {
});
when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
new Object[]{1, 1}, new Object[]{2, 0}));
FilterOperator operator = getOperator(inputSchema, ref1);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0), new Object[]{1, 1});
}
@@ -149,7 +149,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall andCall = new
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.AND.name(),
List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
FilterOperator operator = getOperator(inputSchema, andCall);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0), new Object[]{1, 1});
}
@@ -164,7 +164,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall orCall = new
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.OR.name(),
List.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
FilterOperator operator = getOperator(inputSchema, orCall);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 2);
assertEquals(resultRows.get(0), new Object[]{1, 1});
assertEquals(resultRows.get(1), new Object[]{1, 0});
@@ -180,7 +180,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall notCall = new
RexExpression.FunctionCall(ColumnDataType.BOOLEAN, SqlKind.NOT.name(),
List.of(new RexExpression.InputRef(0)));
FilterOperator operator = getOperator(inputSchema, notCall);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0)[0], 0);
assertEquals(resultRows.get(0)[1], 0);
@@ -197,7 +197,7 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(ColumnDataType.BOOLEAN,
SqlKind.GREATER_THAN.name(),
List.of(new RexExpression.InputRef(0), new
RexExpression.InputRef(1)));
FilterOperator operator = getOperator(inputSchema, greaterThan);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0), new Object[]{3, 2});
}
@@ -213,7 +213,7 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(ColumnDataType.BOOLEAN,
SqlKind.STARTS_WITH.name(),
List.of(new RexExpression.InputRef(0), new
RexExpression.Literal(ColumnDataType.STRING, "star")));
FilterOperator operator = getOperator(inputSchema, startsWith);
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0), new Object[]{"starTree"});
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index e03a2f86f8..f204c04c98 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -242,8 +242,7 @@ public class HashJoinOperatorTest {
});
HashJoinOperator operator =
getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(0),
List.of(0), List.of());
- List<Object[]> resultRows = operator.nextBlock().getContainer();
- assertTrue(resultRows.isEmpty());
+ assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock());
}
@Test
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 56108f8af4..cc92873c94 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -104,8 +105,7 @@ public class MailboxSendOperatorTest {
public void shouldNotSendErrorBlockWhenTimedOut()
throws Exception {
// Given:
- TransferableBlock dataBlock =
- OperatorTestUtil.block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
+ TransferableBlock dataBlock = getDummyDataBlock();
when(_input.nextBlock()).thenReturn(dataBlock);
doThrow(new TimeoutException()).when(_exchange).send(any());
@@ -141,10 +141,8 @@ public class MailboxSendOperatorTest {
public void shouldSendDataBlock()
throws Exception {
// Given:
- TransferableBlock dataBlock1 =
- OperatorTestUtil.block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
- TransferableBlock dataBlock2 =
- OperatorTestUtil.block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
+ TransferableBlock dataBlock1 = getDummyDataBlock();
+ TransferableBlock dataBlock2 = getDummyDataBlock();
TransferableBlock eosBlock =
TransferableBlockUtils.getEndOfStreamTransferableBlock(MultiStageQueryStats.emptyStats(SENDER_STAGE_ID));
when(_input.nextBlock()).thenReturn(dataBlock1, dataBlock2, eosBlock);
@@ -183,8 +181,7 @@ public class MailboxSendOperatorTest {
public void shouldEarlyTerminateWhenUpstreamWhenIndicated()
throws Exception {
// Given:
- TransferableBlock dataBlock =
- OperatorTestUtil.block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
+ TransferableBlock dataBlock = getDummyDataBlock();
when(_input.nextBlock()).thenReturn(dataBlock);
doReturn(true).when(_exchange).send(any());
@@ -203,4 +200,9 @@ public class MailboxSendOperatorTest {
null);
return new MailboxSendOperator(context, _input, statMap -> _exchange);
}
+
+ private static TransferableBlock getDummyDataBlock() {
+ return OperatorTestUtil.block(new DataSchema(new String[]{"intCol"}, new
ColumnDataType[]{ColumnDataType.INT}),
+ new Object[]{1});
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index cd3ca26b6c..ec5d667254 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.data.table.Key;
@@ -270,8 +269,7 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testRankDenseRankRankingFunctions()
- throws ProcessingException {
+ public void testRankDenseRankRankingFunctions() {
// Given:
DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, STRING});
// Input should be in sorted order on the order by key as SortExchange
will handle pre-sorting the data
@@ -294,7 +292,7 @@ public class WindowAggregateOperatorTest {
Integer.MIN_VALUE, 0);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
verifyResultRows(resultRows, keys,
@@ -308,8 +306,7 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testRowNumberRankingFunction()
- throws ProcessingException {
+ public void testRowNumberRankingFunction() {
// Given:
DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, STRING});
// Input should be in sorted order on the order by key as SortExchange
will handle pre-sorting the data
@@ -330,7 +327,7 @@ public class WindowAggregateOperatorTest {
Integer.MIN_VALUE, 0);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
verifyResultRows(resultRows, keys, Map.of(1, List.<Object[]>of(new
Object[]{1, "foo", 1L}), 2,
@@ -340,8 +337,7 @@ public class WindowAggregateOperatorTest {
}
@Test
- public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys()
- throws ProcessingException {
+ public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
// Given:
DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, STRING});
// Input should be in sorted order on the order by key as SortExchange
will handle pre-sorting the data
@@ -360,7 +356,7 @@ public class WindowAggregateOperatorTest {
Integer.MIN_VALUE, Integer.MAX_VALUE);
// When:
- List<Object[]> resultRows = operator.getNextBlock().getContainer();
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
// Then:
verifyResultRows(resultRows, keys, Map.of(1, List.<Object[]>of(new
Object[]{1, "foo", 1.0}), 2,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]