This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad26013ae7 V2 allocation optimizations (#11112)
ad26013ae7 is described below
commit ad26013ae7f894bfeaa1ea21119ea88766df4142
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jul 19 08:18:53 2023 +0200
V2 allocation optimizations (#11112)
---
.../core/operator/query/SelectionOnlyOperator.java | 3 +-
.../query/runtime/operator/HashJoinOperator.java | 136 ++++++++++++++-------
.../query/runtime/operator/TransformOperator.java | 2 +-
.../runtime/operator/exchange/HashExchange.java | 5 +-
4 files changed, 96 insertions(+), 50 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
index 613512fb1d..77ba15b6c2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
@@ -49,7 +49,7 @@ public class SelectionOnlyOperator extends
BaseOperator<SelectionResultsBlock> {
private final BlockValSet[] _blockValSets;
private final DataSchema _dataSchema;
private final int _numRowsToKeep;
- private final List<Object[]> _rows;
+ private final ArrayList<Object[]> _rows;
private final RoaringBitmap[] _nullBitmaps;
private int _numDocsScanned = 0;
@@ -102,6 +102,7 @@ public class SelectionOnlyOperator extends
BaseOperator<SelectionResultsBlock> {
RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(_blockValSets);
int numDocsToAdd = Math.min(_numRowsToKeep - _rows.size(),
valueBlock.getNumDocs());
+ _rows.ensureCapacity(_rows.size() + numDocsToAdd);
_numDocsScanned += numDocsToAdd;
if (_nullHandlingEnabled) {
for (int i = 0; i < numExpressions; i++) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 8d5e5b1a06..f9ade5a99c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -59,12 +59,13 @@ import org.slf4j.LoggerFactory;
// TODO: Move inequi out of hashjoin.
(https://github.com/apache/pinot/issues/9728)
public class HashJoinOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "HASH_JOIN";
+ private static final int INITIAL_HEURISTIC_SIZE = 16;
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregateOperator.class);
private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(
JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT,
JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI);
- private final HashMap<Key, List<Object[]>> _broadcastRightTable;
+ private final HashMap<Key, ArrayList<Object[]>> _broadcastRightTable;
// Used to track matched right rows.
// Only used for right join and full join to output non-matched right rows.
@@ -170,8 +171,12 @@ public class HashJoinOperator extends MultiStageOperator {
List<Object[]> container = rightBlock.getContainer();
// put all the rows into corresponding hash collections keyed by the key
selector function.
for (Object[] row : container) {
- List<Object[]> hashCollection =
- _broadcastRightTable.computeIfAbsent(new
Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+ ArrayList<Object[]> hashCollection =
_broadcastRightTable.computeIfAbsent(
+ new Key(_rightKeySelector.getKey(row)), k -> new
ArrayList<>(INITIAL_HEURISTIC_SIZE));
+ int size = hashCollection.size();
+ if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is
power of 2
+ hashCollection.ensureCapacity(size << 1);
+ }
hashCollection.add(row);
}
rightBlock = _rightTableOperator.nextBlock();
@@ -198,7 +203,7 @@ public class HashJoinOperator extends MultiStageOperator {
if (leftBlock.isSuccessfulEndOfStreamBlock() && needUnmatchedRightRows()) {
// Return remaining non-matched rows for non-inner join.
List<Object[]> returnRows = new ArrayList<>();
- for (Map.Entry<Key, List<Object[]>> entry :
_broadcastRightTable.entrySet()) {
+ for (Map.Entry<Key, ArrayList<Object[]>> entry :
_broadcastRightTable.entrySet()) {
Set<Integer> matchedIdx =
_matchedRightRows.getOrDefault(entry.getKey(), new HashSet<>());
List<Object[]> rightRows = entry.getValue();
if (rightRows.size() == matchedIdx.size()) {
@@ -213,57 +218,96 @@ public class HashJoinOperator extends MultiStageOperator {
_isTerminated = true;
return new TransferableBlock(returnRows, _resultSchema,
DataBlock.Type.ROW);
}
- List<Object[]> rows = new ArrayList<>();
- List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new
ArrayList<>() : leftBlock.getContainer();
- for (Object[] leftRow : container) {
- Key key = new Key(_leftKeySelector.getKey(leftRow));
+ List<Object[]> rows;
+ if (leftBlock.isEndOfStreamBlock()) {
+ rows = new ArrayList<>();
+ } else {
switch (_joinType) {
- case SEMI:
- // SEMI-JOIN only checks existence of the key
- if (_broadcastRightTable.containsKey(key)) {
- rows.add(joinRow(leftRow, null));
- }
+ case SEMI: {
+ rows = buildJoinedDataBlockSemi(leftBlock);
break;
- case ANTI:
- // ANTI-JOIN only checks non-existence of the key
- if (!_broadcastRightTable.containsKey(key)) {
- rows.add(joinRow(leftRow, null));
- }
+ }
+ case ANTI: {
+ rows = buildJoinedDataBlockAnti(leftBlock);
break;
- default: // INNER, LEFT, RIGHT, FULL
- // NOTE: Empty key selector will always give same hash code.
- List<Object[]> matchedRightRows =
_broadcastRightTable.getOrDefault(key, null);
- if (matchedRightRows == null) {
- if (needUnmatchedLeftRows()) {
- rows.add(joinRow(leftRow, null));
- }
- continue;
- }
- boolean hasMatchForLeftRow = false;
- for (int i = 0; i < matchedRightRows.size(); i++) {
- Object[] rightRow = matchedRightRows.get(i);
- // TODO: Optimize this to avoid unnecessary object copy.
- Object[] resultRow = joinRow(leftRow, rightRow);
- if (_joinClauseEvaluators.isEmpty() ||
_joinClauseEvaluators.stream().allMatch(
- evaluator -> (Boolean)
TypeUtils.convert(evaluator.apply(resultRow),
- DataSchema.ColumnDataType.BOOLEAN))) {
- rows.add(resultRow);
- hasMatchForLeftRow = true;
- if (_matchedRightRows != null) {
- HashSet<Integer> matchedRows =
_matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
- matchedRows.add(i);
- }
- }
- }
- if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
- rows.add(joinRow(leftRow, null));
- }
+ }
+ default: { // INNER, LEFT, RIGHT, FULL
+ rows = buildJoinedDataBlockDefault(leftBlock);
break;
+ }
}
}
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
+ private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
+ List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // SEMI-JOIN only checks existence of the key
+ if (_broadcastRightTable.containsKey(key)) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock
leftBlock) {
+ List<Object[]> container = leftBlock.getContainer();
+ ArrayList<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // NOTE: Empty key selector will always give same hash code.
+ List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key,
null);
+ if (matchedRightRows == null) {
+ if (needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ continue;
+ }
+ boolean hasMatchForLeftRow = false;
+ rows.ensureCapacity(rows.size() + matchedRightRows.size());
+ for (int i = 0; i < matchedRightRows.size(); i++) {
+ Object[] rightRow = matchedRightRows.get(i);
+ // TODO: Optimize this to avoid unnecessary object copy.
+ Object[] resultRow = joinRow(leftRow, rightRow);
+ if (_joinClauseEvaluators.isEmpty() ||
_joinClauseEvaluators.stream().allMatch(
+ evaluator -> (Boolean)
TypeUtils.convert(evaluator.apply(resultRow),
+ DataSchema.ColumnDataType.BOOLEAN))) {
+ rows.add(resultRow);
+ hasMatchForLeftRow = true;
+ if (_matchedRightRows != null) {
+ HashSet<Integer> matchedRows =
_matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
+ matchedRows.add(i);
+ }
+ }
+ }
+ if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
{
+ List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Key key = new Key(_leftKeySelector.getKey(leftRow));
+ // ANTI-JOIN only checks non-existence of the key
+ if (!_broadcastRightTable.containsKey(key)) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+ return rows;
+ }
+
private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[]
rightRow) {
Object[] resultRow = new Object[_resultColumnSize];
int idx = 0;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index a913d95295..4aff16138f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -108,8 +108,8 @@ public class TransformOperator extends MultiStageOperator {
return block;
}
- List<Object[]> resultRows = new ArrayList<>();
List<Object[]> container = block.getContainer();
+ List<Object[]> resultRows = new ArrayList<>(container.size());
for (Object[] row : container) {
Object[] resultRow = new Object[_resultColumnSize];
for (int i = 0; i < _resultColumnSize; i++) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index b5c0857107..2f14d1445a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -48,10 +48,11 @@ class HashExchange extends BlockExchange {
protected void route(List<SendingMailbox> destinations, TransferableBlock
block)
throws Exception {
List<Object[]>[] destIdxToRows = new List[destinations.size()];
- for (Object[] row : block.getContainer()) {
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
int partition = _keySelector.computeHash(row) % destinations.size();
if (destIdxToRows[partition] == null) {
- destIdxToRows[partition] = new ArrayList<>();
+ destIdxToRows[partition] = new ArrayList<>(container.size());
}
destIdxToRows[partition].add(row);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]