This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5758aef1ce Optimize lookup table in join operator (#14972)
5758aef1ce is described below
commit 5758aef1ce41124a97d4ba0f6467400978ca4dd2
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Feb 27 11:21:57 2025 -0700
Optimize lookup table in join operator (#14972)
---
.../query/runtime/operator/BaseJoinOperator.java | 1 +
.../query/runtime/operator/HashJoinOperator.java | 203 ++++++++++++++-------
.../runtime/operator/join/DoubleLookupTable.java | 65 +++++++
.../runtime/operator/join/FloatLookupTable.java | 65 +++++++
.../runtime/operator/join/IntLookupTable.java | 65 +++++++
.../runtime/operator/join/LongLookupTable.java | 65 +++++++
.../query/runtime/operator/join/LookupTable.java | 100 ++++++++++
.../runtime/operator/join/ObjectLookupTable.java | 64 +++++++
8 files changed, 557 insertions(+), 71 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 23d7c29710..3eee997c20 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -228,6 +228,7 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
protected abstract List<Object[]> buildNonMatchRightRows();
+ // TODO: Optimize this to avoid unnecessary object copy.
protected Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[]
rightRow) {
Object[] resultRow = new Object[_resultColumnSize];
if (leftRow != null) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index b51fc216ec..3a42e5da3b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -31,8 +31,13 @@ import
org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.join.DoubleLookupTable;
+import org.apache.pinot.query.runtime.operator.join.FloatLookupTable;
+import org.apache.pinot.query.runtime.operator.join.IntLookupTable;
+import org.apache.pinot.query.runtime.operator.join.LongLookupTable;
+import org.apache.pinot.query.runtime.operator.join.LookupTable;
+import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.utils.BooleanUtils;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -40,27 +45,50 @@ import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOver
* This {@code HashJoinOperator} join algorithm with join keys. Right table is
materialized into a hash table.
*/
// TODO: Support memory size based resource limit.
+@SuppressWarnings("unchecked")
public class HashJoinOperator extends BaseJoinOperator {
private static final String EXPLAIN_NAME = "HASH_JOIN";
- private static final int INITIAL_HEURISTIC_SIZE = 16;
+
+ // Placeholder for BitSet in _matchedRightRows when all keys are unique in
the right table.
+ private static final BitSet BIT_SET_PLACEHOLDER = new BitSet(0);
private final KeySelector<?> _leftKeySelector;
private final KeySelector<?> _rightKeySelector;
- private final Map<Object, ArrayList<Object[]>> _rightTable;
+ private final LookupTable _rightTable;
// Track matched right rows for right join and full join to output
non-matched right rows.
// TODO: Revisit whether we should use IntList or RoaringBitmap for smaller
memory footprint.
+ // TODO: Optimize this
private final Map<Object, BitSet> _matchedRightRows;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftInput, DataSchema leftSchema,
MultiStageOperator rightInput, JoinNode node) {
super(context, leftInput, leftSchema, rightInput, node);
- Preconditions.checkState(!node.getLeftKeys().isEmpty(), "Hash join
operator requires join keys");
- _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
+ List<Integer> leftKeys = node.getLeftKeys();
+ Preconditions.checkState(!leftKeys.isEmpty(), "Hash join operator requires
join keys");
+ _leftKeySelector = KeySelectorFactory.getKeySelector(leftKeys);
_rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
- _rightTable = new HashMap<>();
+ _rightTable = createLookupTable(leftKeys, leftSchema);
_matchedRightRows = needUnmatchedRightRows() ? new HashMap<>() : null;
}
+ private static LookupTable createLookupTable(List<Integer> joinKeys,
DataSchema schema) {
+ if (joinKeys.size() > 1) {
+ return new ObjectLookupTable();
+ }
+ switch (schema.getColumnDataType(joinKeys.get(0)).getStoredType()) {
+ case INT:
+ return new IntLookupTable();
+ case LONG:
+ return new LongLookupTable();
+ case FLOAT:
+ return new FloatLookupTable();
+ case DOUBLE:
+ return new DoubleLookupTable();
+ default:
+ return new ObjectLookupTable();
+ }
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
@@ -71,41 +99,35 @@ public class HashJoinOperator extends BaseJoinOperator {
throws ProcessingException {
LOGGER.trace("Building hash table for join operator");
long startTime = System.currentTimeMillis();
- int numRowsInHashTable = 0;
+ int numRows = 0;
TransferableBlock rightBlock = _rightInput.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
- List<Object[]> container = rightBlock.getContainer();
+ List<Object[]> rows = rightBlock.getContainer();
// Row based overflow check.
- if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
+ if (rows.size() + numRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
- int remainingRows = _maxRowsInJoin - numRowsInHashTable;
- container = container.subList(0, remainingRows);
+ int remainingRows = _maxRowsInJoin - numRows;
+ rows = rows.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and
awaits EOS block next.
_rightInput.earlyTerminate();
}
}
- // put all the rows into corresponding hash collections keyed by the key
selector function.
- for (Object[] row : container) {
- ArrayList<Object[]> hashCollection =
- _rightTable.computeIfAbsent(_rightKeySelector.getKey(row), k ->
new ArrayList<>(INITIAL_HEURISTIC_SIZE));
- int size = hashCollection.size();
- if ((size & size - 1) == 0 && size < _maxRowsInJoin && size <
Integer.MAX_VALUE / 2) { // is power of 2
- hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin));
- }
- hashCollection.add(row);
+ for (Object[] row : rows) {
+ _rightTable.addRow(_rightKeySelector.getKey(row), row);
}
- numRowsInHashTable += container.size();
+ numRows += rows.size();
sampleAndCheckInterruption();
rightBlock = _rightInput.nextBlock();
}
if (rightBlock.isErrorBlock()) {
_upstreamErrorBlock = rightBlock;
} else {
+ _rightTable.finish();
_isRightTableBuilt = true;
_rightSideStats = rightBlock.getQueryStats();
assert _rightSideStats != null;
@@ -123,69 +145,99 @@ public class HashJoinOperator extends BaseJoinOperator {
case ANTI:
return buildJoinedDataBlockAnti(leftBlock);
default: { // INNER, LEFT, RIGHT, FULL
- return buildJoinedDataBlockDefault(leftBlock);
+ if (_rightTable.isKeysUnique()) {
+ return buildJoinedDataBlockUniqueKeys(leftBlock);
+ } else {
+ return buildJoinedDataBlockDuplicateKeys(leftBlock);
+ }
}
}
}
- private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock
leftBlock)
+ private List<Object[]> buildJoinedDataBlockUniqueKeys(TransferableBlock
leftBlock)
throws ProcessingException {
- List<Object[]> container = leftBlock.getContainer();
- ArrayList<Object[]> rows = new ArrayList<>(container.size());
+ List<Object[]> leftRows = leftBlock.getContainer();
+ ArrayList<Object[]> rows = new ArrayList<>(leftRows.size());
- for (Object[] leftRow : container) {
+ for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
- // NOTE: Empty key selector will always give same hash code.
- List<Object[]> rightRows = _rightTable.get(key);
- if (rightRows == null) {
- if (needUnmatchedLeftRows()) {
- if (isMaxRowsLimitReached(rows.size())) {
- break;
- }
- rows.add(joinRow(leftRow, null));
- }
- continue;
- }
- boolean hasMatchForLeftRow = false;
- int numRightRows = rightRows.size();
- rows.ensureCapacity(rows.size() + numRightRows);
- boolean maxRowsLimitReached = false;
- for (int i = 0; i < numRightRows; i++) {
- Object[] rightRow = rightRows.get(i);
- // TODO: Optimize this to avoid unnecessary object copy.
+ Object[] rightRow = (Object[]) _rightTable.lookup(key);
+ if (rightRow == null) {
+ handleUnmatchedLeftRow(leftRow, rows);
+ } else {
Object[] resultRow = joinRow(leftRow, rightRow);
- if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
- .allMatch(evaluator ->
BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
+ if (matchNonEquiConditions(resultRow)) {
if (isMaxRowsLimitReached(rows.size())) {
- maxRowsLimitReached = true;
break;
}
rows.add(resultRow);
- hasMatchForLeftRow = true;
if (_matchedRightRows != null) {
- _matchedRightRows.computeIfAbsent(key, k -> new
BitSet(numRightRows)).set(i);
+ _matchedRightRows.put(key, BIT_SET_PLACEHOLDER);
}
+ } else {
+ handleUnmatchedLeftRow(leftRow, rows);
}
}
- if (maxRowsLimitReached) {
- break;
- }
- if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
- if (isMaxRowsLimitReached(rows.size())) {
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockDuplicateKeys(TransferableBlock
leftBlock)
+ throws ProcessingException {
+ List<Object[]> leftRows = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(leftRows.size());
+
+ for (Object[] leftRow : leftRows) {
+ Object key = _leftKeySelector.getKey(leftRow);
+ List<Object[]> rightRows = (List<Object[]>) _rightTable.lookup(key);
+ if (rightRows == null) {
+ handleUnmatchedLeftRow(leftRow, rows);
+ } else {
+ boolean maxRowsLimitReached = false;
+ boolean hasMatchForLeftRow = false;
+ int numRightRows = rightRows.size();
+ for (int i = 0; i < numRightRows; i++) {
+ Object[] resultRow = joinRow(leftRow, rightRows.get(i));
+ if (matchNonEquiConditions(resultRow)) {
+ if (isMaxRowsLimitReached(rows.size())) {
+ maxRowsLimitReached = true;
+ break;
+ }
+ rows.add(resultRow);
+ hasMatchForLeftRow = true;
+ if (_matchedRightRows != null) {
+ _matchedRightRows.computeIfAbsent(key, k -> new
BitSet(numRightRows)).set(i);
+ }
+ }
+ }
+ if (maxRowsLimitReached) {
break;
}
- rows.add(joinRow(leftRow, null));
+ if (!hasMatchForLeftRow) {
+ handleUnmatchedLeftRow(leftRow, rows);
+ }
}
}
return rows;
}
+ private void handleUnmatchedLeftRow(Object[] leftRow, List<Object[]> rows)
+ throws ProcessingException {
+ if (needUnmatchedLeftRows()) {
+ if (isMaxRowsLimitReached(rows.size())) {
+ return;
+ }
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
- List<Object[]> container = leftBlock.getContainer();
- List<Object[]> rows = new ArrayList<>(container.size());
+ List<Object[]> leftRows = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(leftRows.size());
- for (Object[] leftRow : container) {
+ for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_rightTable.containsKey(key)) {
@@ -197,10 +249,10 @@ public class HashJoinOperator extends BaseJoinOperator {
}
private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
{
- List<Object[]> container = leftBlock.getContainer();
- List<Object[]> rows = new ArrayList<>(container.size());
+ List<Object[]> leftRows = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(leftRows.size());
- for (Object[] leftRow : container) {
+ for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_rightTable.containsKey(key)) {
@@ -214,18 +266,27 @@ public class HashJoinOperator extends BaseJoinOperator {
@Override
protected List<Object[]> buildNonMatchRightRows() {
List<Object[]> rows = new ArrayList<>();
- for (Map.Entry<Object, ArrayList<Object[]>> entry :
_rightTable.entrySet()) {
- List<Object[]> rightRows = entry.getValue();
- BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
- if (matchedIndices == null) {
- for (Object[] rightRow : rightRows) {
+ if (_rightTable.isKeysUnique()) {
+ for (Map.Entry<Object, Object[]> entry : _rightTable.entrySet()) {
+ Object[] rightRow = entry.getValue();
+ if (!_matchedRightRows.containsKey(entry.getKey())) {
rows.add(joinRow(null, rightRow));
}
- } else {
- int numRightRows = rightRows.size();
- int unmatchedIndex = 0;
- while ((unmatchedIndex = matchedIndices.nextClearBit(unmatchedIndex))
< numRightRows) {
- rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+ }
+ } else {
+ for (Map.Entry<Object, ArrayList<Object[]>> entry :
_rightTable.entrySet()) {
+ List<Object[]> rightRows = entry.getValue();
+ BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
+ if (matchedIndices == null) {
+ for (Object[] rightRow : rightRows) {
+ rows.add(joinRow(null, rightRow));
+ }
+ } else {
+ int numRightRows = rightRows.size();
+ int unmatchedIndex = 0;
+ while ((unmatchedIndex =
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
+ rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
+ }
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
new file mode 100644
index 0000000000..77c9266d39
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/DoubleLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.doubles.Double2ObjectMap;
+import it.unimi.dsi.fastutil.doubles.Double2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code DoubleLookupTable} is a lookup table for double keys.
+ */
+@SuppressWarnings("unchecked")
+public class DoubleLookupTable extends LookupTable {
+ private final Double2ObjectOpenHashMap<Object> _lookupTable = new
Double2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+ @Override
+ public void addRow(Object key, Object[] row) {
+ _lookupTable.compute((double) key, (k, v) -> computeNewValue(row, v));
+ }
+
+ @Override
+ public void finish() {
+ if (!_keysUnique) {
+ for (Double2ObjectMap.Entry<Object> entry :
_lookupTable.double2ObjectEntrySet()) {
+ convertValueToList(entry);
+ }
+ }
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return _lookupTable.containsKey((double) key);
+ }
+
+ @Nullable
+ @Override
+ public Object lookup(Object key) {
+ return _lookupTable.get((double) key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<Map.Entry> entrySet() {
+ return (Set) _lookupTable.double2ObjectEntrySet();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
new file mode 100644
index 0000000000..437b3f8547
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/FloatLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.floats.Float2ObjectMap;
+import it.unimi.dsi.fastutil.floats.Float2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code FloatLookupTable} is a lookup table for float keys.
+ */
+@SuppressWarnings("unchecked")
+public class FloatLookupTable extends LookupTable {
+ private final Float2ObjectOpenHashMap<Object> _lookupTable = new
Float2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+ @Override
+ public void addRow(Object key, Object[] row) {
+ _lookupTable.compute((float) key, (k, v) -> computeNewValue(row, v));
+ }
+
+ @Override
+ public void finish() {
+ if (!_keysUnique) {
+ for (Float2ObjectMap.Entry<Object> entry :
_lookupTable.float2ObjectEntrySet()) {
+ convertValueToList(entry);
+ }
+ }
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return _lookupTable.containsKey((float) key);
+ }
+
+ @Nullable
+ @Override
+ public Object lookup(Object key) {
+ return _lookupTable.get((float) key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<Map.Entry> entrySet() {
+ return (Set) _lookupTable.float2ObjectEntrySet();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
new file mode 100644
index 0000000000..688192b6cc
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/IntLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code IntLookupTable} is a lookup table for int keys.
+ */
+@SuppressWarnings("unchecked")
+public class IntLookupTable extends LookupTable {
+ private final Int2ObjectOpenHashMap<Object> _lookupTable = new
Int2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+ @Override
+ public void addRow(Object key, Object[] row) {
+ _lookupTable.compute((int) key, (k, v) -> computeNewValue(row, v));
+ }
+
+ @Override
+ public void finish() {
+ if (!_keysUnique) {
+ for (Int2ObjectMap.Entry<Object> entry :
_lookupTable.int2ObjectEntrySet()) {
+ convertValueToList(entry);
+ }
+ }
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return _lookupTable.containsKey((int) key);
+ }
+
+ @Nullable
+ @Override
+ public Object lookup(Object key) {
+ return _lookupTable.get((int) key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<Map.Entry> entrySet() {
+ return (Set) _lookupTable.int2ObjectEntrySet();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
new file mode 100644
index 0000000000..5e393f4647
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LongLookupTable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code LongLookupTable} is a lookup table for long keys.
+ */
+@SuppressWarnings("unchecked")
+public class LongLookupTable extends LookupTable {
+ private final Long2ObjectOpenHashMap<Object> _lookupTable = new
Long2ObjectOpenHashMap<>(INITIAL_CAPACITY);
+
+ @Override
+ public void addRow(Object key, Object[] row) {
+ _lookupTable.compute((long) key, (k, v) -> computeNewValue(row, v));
+ }
+
+ @Override
+ public void finish() {
+ if (!_keysUnique) {
+ for (Long2ObjectMap.Entry<Object> entry :
_lookupTable.long2ObjectEntrySet()) {
+ convertValueToList(entry);
+ }
+ }
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return _lookupTable.containsKey((long) key);
+ }
+
+ @Nullable
+ @Override
+ public Object lookup(Object key) {
+ return _lookupTable.get((long) key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<Map.Entry> entrySet() {
+ return (Set) _lookupTable.long2ObjectEntrySet();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
new file mode 100644
index 0000000000..0b62092bbe
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/LookupTable.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+public abstract class LookupTable {
+ // TODO: Make it configurable
+ protected static final int INITIAL_CAPACITY = 10000;
+
+ protected boolean _keysUnique = true;
+
+ /**
+ * Adds a row to the lookup table.
+ */
+ public abstract void addRow(Object key, Object[] row);
+
+ @SuppressWarnings("unchecked")
+ protected Object computeNewValue(Object[] row, @Nullable Object
currentValue) {
+ if (currentValue == null) {
+ return row;
+ } else {
+ _keysUnique = false;
+ if (currentValue instanceof List) {
+ ((List<Object[]>) currentValue).add(row);
+ return currentValue;
+ } else {
+ List<Object[]> rows = new ArrayList<>();
+ rows.add((Object[]) currentValue);
+ rows.add(row);
+ return rows;
+ }
+ }
+ }
+
+ /**
+ * Finishes adding rows to the lookup table. This method should be called
after all rows are added to the lookup
+ * table, and before looking up rows.
+ */
+ public abstract void finish();
+
+ protected static void convertValueToList(Map.Entry<?, Object> entry) {
+ Object value = entry.getValue();
+ if (value instanceof Object[]) {
+ entry.setValue(Collections.singletonList(value));
+ }
+ }
+
+ /**
+ * Returns {@code true} when all the keys added to the lookup table are
unique.
+ * When all keys are unique, the value of the lookup table is a single row
({@code Object[]}). When keys are not
+ * unique, the value of the lookup table is a list of rows ({@code
List<Object[]>}).
+ */
+ public boolean isKeysUnique() {
+ return _keysUnique;
+ }
+
+ /**
+ * Returns {@code true} if the lookup table contains the given key.
+ */
+ public abstract boolean containsKey(Object key);
+
+ /**
+ * Returns the row/rows for the given key. When {@link #isKeysUnique}
returns {@code true}, this method returns a
+ * single row ({@code Object[]}). When {@link #isKeysUnique} returns {@code
false}, this method returns a list of rows
+ * ({@code List<Object[]>}). Returns {@code null} if the key does not exist
in the lookup table.
+ */
+ @Nullable
+ public abstract Object lookup(Object key);
+
+ /**
+ * Returns all the entries in the lookup table. When {@link #isKeysUnique}
returns {@code true}, the value of the
+ * entries is a single row ({@code Object[]}). When {@link #isKeysUnique}
returns {@code false}, the value of the
+ * entries is a list of rows ({@code List<Object[]>}).
+ */
+ @SuppressWarnings("rawtypes")
+ public abstract Set<Map.Entry> entrySet();
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
new file mode 100644
index 0000000000..f455b1a8c3
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.join;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code DoubleLookupTable} is a lookup table for non-primitive keys.
+ */
+@SuppressWarnings("unchecked")
+public class ObjectLookupTable extends LookupTable {
+ private final Map<Object, Object> _lookupTable =
Maps.newHashMapWithExpectedSize(INITIAL_CAPACITY);
+
+ @Override
+ public void addRow(Object key, Object[] row) {
+ _lookupTable.compute(key, (k, v) -> computeNewValue(row, v));
+ }
+
+ @Override
+ public void finish() {
+ if (!_keysUnique) {
+ for (Map.Entry<Object, Object> entry : _lookupTable.entrySet()) {
+ convertValueToList(entry);
+ }
+ }
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return _lookupTable.containsKey(key);
+ }
+
+ @Nullable
+ @Override
+ public Object lookup(Object key) {
+ return _lookupTable.get(key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<Map.Entry> entrySet() {
+ return (Set) _lookupTable.entrySet();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]