This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e8f5e9b8ab Enforce max rows in join limit on joined rows with left
input as well (#13922)
e8f5e9b8ab is described below
commit e8f5e9b8ab01c5fae6657eeb193c9e82f1eecf04
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Sep 2 18:48:45 2024 +0530
Enforce max rows in join limit on joined rows with left input as well
(#13922)
---
.../query/runtime/operator/HashJoinOperator.java | 141 ++++++++++++++++-----
.../runtime/operator/HashJoinOperatorTest.java | 77 ++++++++++-
2 files changed, 180 insertions(+), 38 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index dd60ff523f..c18deb2ea4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -94,12 +94,14 @@ public class HashJoinOperator extends MultiStageOperator {
private final List<TransformOperand> _nonEquiEvaluators;
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
- // Below are specific parameters to protect the hash table from growing too
large.
+ // Below are specific parameters to protect the server from a very large
join operation.
// Once the hash table reaches the limit, we will throw exception or break
the right table build process.
+ // The limit also applies to the number of joined rows in blocks from the
left table.
/**
- * Max rows allowed to build the right table hash collection.
+ * Max rows allowed to build the right table hash collection. Also max rows
emitted in each join with a block from
+ * the left table.
*/
- private final int _maxRowsInHashTable;
+ private final int _maxRowsInJoin;
/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception,
no JOIN with left table performed.
@@ -109,6 +111,7 @@ public class HashJoinOperator extends MultiStageOperator {
private boolean _isHashTableBuilt;
private int _currentRowsInHashTable;
+ private int _currentJoinedRows;
private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
@@ -145,8 +148,10 @@ public class HashJoinOperator extends MultiStageOperator {
}
Map<String, String> metadata = context.getOpChainMetadata();
PlanNode.NodeHint nodeHint = node.getNodeHint();
- _maxRowsInHashTable = getMaxRowInJoin(metadata, nodeHint);
+ _maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
_joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
+ _currentRowsInHashTable = 0;
+ _currentJoinedRows = 0;
}
@Override
@@ -165,7 +170,7 @@ public class HashJoinOperator extends MultiStageOperator {
return LOGGER;
}
- private int getMaxRowInJoin(Map<String, String> opChainMetadata, @Nullable
PlanNode.NodeHint nodeHint) {
+ private int getMaxRowsInJoin(Map<String, String> opChainMetadata, @Nullable
PlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> joinOptions =
nodeHint.getHintOptions().get(PinotHintOptions.JOIN_HINT_OPTIONS);
if (joinOptions != null) {
@@ -224,25 +229,13 @@ public class HashJoinOperator extends MultiStageOperator {
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
- if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
+ if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- ProcessingException resourceLimitExceededException =
- new
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
- resourceLimitExceededException.setMessage(
- "Cannot build in memory hash table for join operator, reach
number of rows limit: " + _maxRowsInHashTable
- + ". Consider increasing the limit for the maximum number of
rows in a join either via the query "
- + "option '" +
CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
- + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "'
hint in the '"
- + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if
partial results are acceptable, the join"
- + " overflow mode can be set to '" +
JoinOverFlowMode.BREAK.name() + "' either via the query option '"
- +
CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
- + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "'
hint in the '"
- + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Furthermore, if
there is a large disparity in the size of "
- + "the two tables being joined, use the smaller table as the
right input instead of the left.");
- throw resourceLimitExceededException;
+ throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in
memory hash table for join operator, "
+ + "reached number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
- int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
+ int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and
awaits EOS block next.
@@ -254,8 +247,8 @@ public class HashJoinOperator extends MultiStageOperator {
ArrayList<Object[]> hashCollection =
_broadcastRightTable.computeIfAbsent(_rightKeySelector.getKey(row),
k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE));
int size = hashCollection.size();
- if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size <
Integer.MAX_VALUE / 2) { // is power of 2
- hashCollection.ensureCapacity(Math.min(size << 1,
_maxRowsInHashTable));
+ if ((size & size - 1) == 0 && size < _maxRowsInJoin && size <
Integer.MAX_VALUE / 2) { // is power of 2
+ hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin));
}
hashCollection.add(row);
}
@@ -272,15 +265,18 @@ public class HashJoinOperator extends MultiStageOperator {
_statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
}
- private TransferableBlock buildJoinedDataBlock() {
- if (_isTerminated) {
- assert _leftSideStats != null;
- return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
- }
-
+ private TransferableBlock buildJoinedDataBlock() throws ProcessingException {
// Keep reading the input blocks until we find a match row or all blocks
are processed.
// TODO: Consider batching the rows to improve performance.
while (true) {
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
+ if (_isTerminated) {
+ assert _leftSideStats != null;
+ return
TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
+ }
+
TransferableBlock leftBlock = _leftInput.nextBlock();
if (leftBlock.isErrorBlock()) {
return leftBlock;
@@ -307,7 +303,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
- private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) {
+ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) throws
ProcessingException {
switch (_joinType) {
case SEMI:
return buildJoinedDataBlockSemi(leftBlock);
@@ -319,7 +315,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
- private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
+ private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
@@ -327,6 +323,9 @@ public class HashJoinOperator extends MultiStageOperator {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
+ if (incrementJoinedRowsAndCheckLimit()) {
+ break;
+ }
rows.add(joinRow(leftRow, null));
}
}
@@ -334,7 +333,7 @@ public class HashJoinOperator extends MultiStageOperator {
return rows;
}
- private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock
leftBlock) {
+ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock
leftBlock) throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
ArrayList<Object[]> rows = new ArrayList<>(container.size());
@@ -344,6 +343,9 @@ public class HashJoinOperator extends MultiStageOperator {
List<Object[]> rightRows = _broadcastRightTable.get(key);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
+ if (incrementJoinedRowsAndCheckLimit()) {
+ break;
+ }
rows.add(joinRow(leftRow, null));
}
continue;
@@ -357,6 +359,9 @@ public class HashJoinOperator extends MultiStageOperator {
Object[] resultRow = joinRow(leftRow, rightRow);
if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
.allMatch(evaluator ->
BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
+ if (incrementJoinedRowsAndCheckLimit()) {
+ break;
+ }
rows.add(resultRow);
hasMatchForLeftRow = true;
if (_matchedRightRows != null) {
@@ -364,7 +369,13 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
}
+ if (_currentJoinedRows > _maxRowsInJoin) {
+ break;
+ }
if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
+ if (incrementJoinedRowsAndCheckLimit()) {
+ break;
+ }
rows.add(joinRow(leftRow, null));
}
}
@@ -372,7 +383,7 @@ public class HashJoinOperator extends MultiStageOperator {
return rows;
}
- private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
{
+ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
@@ -380,9 +391,13 @@ public class HashJoinOperator extends MultiStageOperator {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_broadcastRightTable.containsKey(key)) {
+ if (incrementJoinedRowsAndCheckLimit()) {
+ break;
+ }
rows.add(joinRow(leftRow, null));
}
}
+
return rows;
}
@@ -432,6 +447,66 @@ public class HashJoinOperator extends MultiStageOperator {
return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
}
+ private void earlyTerminateLeftInput() {
+ _leftInput.earlyTerminate();
+ TransferableBlock leftBlock = _leftInput.nextBlock();
+
+ while (!leftBlock.isSuccessfulEndOfStreamBlock()) {
+ if (leftBlock.isErrorBlock()) {
+ _upstreamErrorBlock = leftBlock;
+ return;
+ }
+ leftBlock = _leftInput.nextBlock();
+ }
+
+ assert leftBlock.isSuccessfulEndOfStreamBlock();
+ assert _rightSideStats != null;
+ _leftSideStats = leftBlock.getQueryStats();
+ assert _leftSideStats != null;
+ _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap);
+ _isTerminated = true;
+ }
+
+ /**
+ * Increments {@link #_currentJoinedRows} and checks if the limit has been
exceeded. If the limit has been exceeded,
+ * either an exception is thrown or the left input is early terminated based
on the {@link #_joinOverflowMode}.
+ *
+ * @return {@code true} if the limit has been exceeded, {@code false}
otherwise
+ */
+ private boolean incrementJoinedRowsAndCheckLimit() throws
ProcessingException {
+ _currentJoinedRows++;
+ if (_currentJoinedRows > _maxRowsInJoin) {
+ if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join,
reached number of rows limit: "
+ + _maxRowsInJoin);
+ } else {
+ // Skip over remaining blocks until we reach the end of stream since
we already breached the rows limit.
+ logger().info("Terminating join operator early as the maximum number
of rows limit was reached: {}",
+ _maxRowsInJoin);
+ earlyTerminateLeftInput();
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void throwProcessingExceptionForJoinRowLimitExceeded(String reason)
throws ProcessingException {
+ ProcessingException resourceLimitExceededException =
+ new
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ resourceLimitExceededException.setMessage(
+ reason + ". Consider increasing the limit for the maximum number of
rows in a join either via the query "
+ + "option '" +
CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in
the '"
+ + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if
partial results are acceptable, the join"
+ + " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name()
+ "' either via the query option '"
+ + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE
+ "' or the '"
+ + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in
the '"
+ + PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
+ throw resourceLimitExceededException;
+ }
+
public enum StatKey implements StatMap.Key {
//@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index f204c04c98..983b2bf937 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -110,7 +110,7 @@ public class HashJoinOperatorTest {
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
- new DataSchema(new String[]{"int_col1", "string_col1", "int_col2",
"string_co2"}, new ColumnDataType[]{
+ new DataSchema(new String[]{"int_col1", "string_col1", "int_col2",
"string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING
});
HashJoinOperator operator =
@@ -136,7 +136,7 @@ public class HashJoinOperatorTest {
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
- new DataSchema(new String[]{"int_col1", "string_col1", "int_col2",
"string_co2"}, new ColumnDataType[]{
+ new DataSchema(new String[]{"int_col1", "string_col1", "int_col2",
"string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING
});
HashJoinOperator operator =
@@ -468,7 +468,7 @@ public class HashJoinOperatorTest {
}
@Test
- public void shouldPropagateJoinLimitError() {
+ public void shouldPropagateRightInputJoinLimitError() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
@@ -493,11 +493,13 @@ public class HashJoinOperatorTest {
TransferableBlock block = operator.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
- .contains("reach number of rows limit"));
+ .contains("reached number of rows limit"));
+
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("Cannot build in memory hash table"));
}
@Test
- public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() {
+ public void
shouldHandleJoinWithPartialResultsWhenHitDataRowsLimitOnRightInput() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
@@ -529,6 +531,71 @@ public class HashJoinOperatorTest {
"Max rows in join should be reached");
}
+ @Test
+ public void shouldPropagateLeftInputJoinLimitError() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING
+ });
+ when(_leftInput.nextBlock()).thenReturn(
+ OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+ when(_rightInput.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new
Object[]{2, "BB"}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+ DataSchema resultSchema =
+ new DataSchema(new String[]{"int_col1", "string_col1", "int_co2",
"string_col2"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING
+ });
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+ Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "THROW",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
+ HashJoinOperator operator =
+ getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(1),
List.of(1), List.of(), nodeHint);
+ TransferableBlock block = operator.nextBlock();
+ assertTrue(block.isErrorBlock());
+
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("reached number of rows limit"));
+
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+ .contains("Cannot process join"));
+ }
+
+ @Test
+ public void
shouldHandleJoinWithPartialResultsWhenHitDataRowsLimitOnLeftInput() {
+ DataSchema leftSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING
+ });
+ DataSchema rightSchema = new DataSchema(new String[]{"int_col",
"string_col"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING
+ });
+ when(_leftInput.nextBlock()).thenReturn(
+ OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new
Object[]{2, "Aa"}, new Object[]{3, "Aa"}))
+ .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{4, "Aa"},
new Object[]{5, "Aa"}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+ when(_rightInput.nextBlock()).thenReturn(
+ OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}))
+
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+ DataSchema resultSchema =
+ new DataSchema(new String[]{"int_col1", "string_col1", "int_co2",
"string_col2"}, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING
+ });
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+ Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
+ HashJoinOperator operator =
+ getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(1),
List.of(1), List.of(), nodeHint);
+ List<Object[]> resultRows = operator.nextBlock().getContainer();
+ Mockito.verify(_leftInput).earlyTerminate();
+ assertEquals(resultRows.size(), 2);
+ TransferableBlock block2 = operator.nextBlock();
+ assertTrue(block2.isSuccessfulEndOfStreamBlock());
+ StatMap<HashJoinOperator.StatKey> statMap =
OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, block2);
+
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
+ "Max rows in join should be reached");
+ }
+
private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema
resultSchema, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression>
nonEquiConditions,
PlanNode.NodeHint nodeHint) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]