This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f820daaa80 allow unpacked content in transferable block (#8963)
f820daaa80 is described below
commit f820daaa80c245373ac847e169624f1f7d55ae1c
Author: Rong Rong <[email protected]>
AuthorDate: Tue Jun 28 13:33:44 2022 -0700
allow unpacked content in transferable block (#8963)
* add container to the block so that extracted results doesn't need to be
serialized
* transferrable block to carry container instead of always serialized format
* fix bug
Co-authored-by: Rong Rong <[email protected]>
---
.../core/common/datablock/DataBlockBuilder.java | 9 ++-
.../core/common/datablock/DataBlockUtils.java | 64 ++++++++++++++++++++++
.../pinot/core/common/datablock/DataBlockTest.java | 2 +-
.../query/runtime/blocks/TransferableBlock.java | 51 ++++++++++++++++-
.../query/runtime/operator/HashJoinOperator.java | 33 +++++------
5 files changed, 136 insertions(+), 23 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 788b68a91d..8a47f261ff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -188,7 +188,8 @@ public class DataBlockBuilder {
return buildRowBlock(rowBuilder);
}
- public static ColumnarDataBlock buildFromColumns(List<Object[]> columns,
DataSchema dataSchema)
+ public static ColumnarDataBlock buildFromColumns(List<Object[]> columns,
@Nullable RoaringBitmap[] colNullBitmaps,
+ DataSchema dataSchema)
throws IOException {
DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema,
BaseDataBlock.Type.COLUMNAR);
for (int i = 0; i < columns.size(); i++) {
@@ -303,6 +304,12 @@ public class DataBlockBuilder {
}
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(),
0, byteBuffer.position());
}
+ // Write null bitmaps after writing data.
+ if (colNullBitmaps != null) {
+ for (RoaringBitmap nullBitmap : colNullBitmaps) {
+ columnarBuilder.setNullRowIds(nullBitmap);
+ }
+ }
return buildColumnarBlock(columnarBuilder);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index f1dedd49d0..1a26365210 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -20,6 +20,8 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
@@ -74,6 +76,68 @@ public final class DataBlockUtils {
}
}
+ public static List<Object[]> extraRows(BaseDataBlock dataBlock) {
+ DataSchema dataSchema = dataBlock.getDataSchema();
+ DataSchema.ColumnDataType[] storedColumnDataTypes =
dataSchema.getStoredColumnDataTypes();
+ int numRows = dataBlock.getNumberOfRows();
+ int numColumns = storedColumnDataTypes.length;
+
+ List<Object[]> rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ Object[] row = new Object[numColumns];
+ for (int j = 0; j < numColumns; j++) {
+ switch (storedColumnDataTypes[j]) {
+ // Single-value column
+ case INT:
+ row[j] = dataBlock.getInt(i, j);
+ break;
+ case LONG:
+ row[j] = dataBlock.getLong(i, j);
+ break;
+ case FLOAT:
+ row[j] = dataBlock.getFloat(i, j);
+ break;
+ case DOUBLE:
+ row[j] = dataBlock.getDouble(i, j);
+ break;
+ case BIG_DECIMAL:
+ row[j] = dataBlock.getBigDecimal(i, j);
+ break;
+ case STRING:
+ row[j] = dataBlock.getString(i, j);
+ break;
+ case BYTES:
+ row[j] = dataBlock.getBytes(i, j);
+ break;
+
+ // Multi-value column
+ case INT_ARRAY:
+ row[j] = dataBlock.getIntArray(i, j);
+ break;
+ case LONG_ARRAY:
+ row[j] = dataBlock.getLongArray(i, j);
+ break;
+ case FLOAT_ARRAY:
+ row[j] = dataBlock.getFloatArray(i, j);
+ break;
+ case DOUBLE_ARRAY:
+ row[j] = dataBlock.getDoubleArray(i, j);
+ break;
+ case STRING_ARRAY:
+ row[j] = dataBlock.getStringArray(i, j);
+ break;
+
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s",
storedColumnDataTypes[j],
+ dataSchema.getColumnName(j)));
+ }
+ }
+ rows.add(row);
+ }
+ return rows;
+ }
+
/**
* Given a {@link DataSchema}, compute each column's offset and fill them
into the passed in array, then return the
* row size in bytes.
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index b9e816374c..c9b79218b6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -110,7 +110,7 @@ public class DataBlockTest {
List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema,
TEST_ROW_COUNT);
List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema,
rows);
RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null,
dataSchema);
- ColumnarDataBlock columnarBlock =
DataBlockBuilder.buildFromColumns(columnars, dataSchema);
+ ColumnarDataBlock columnarBlock =
DataBlockBuilder.buildFromColumns(columnars, null, dataSchema);
for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
DataSchema.ColumnDataType columnDataType =
dataSchema.getColumnDataType(colId);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index ae64cc90bb..79470fed8c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -19,6 +19,8 @@
package org.apache.pinot.query.runtime.blocks;
import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -26,6 +28,8 @@ import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.common.datablock.RowDataBlock;
@@ -35,16 +39,61 @@ import org.apache.pinot.core.common.datablock.RowDataBlock;
*/
public class TransferableBlock implements Block {
+ private final BaseDataBlock.Type _type;
+ private final DataSchema _dataSchema;
+
private BaseDataBlock _dataBlock;
- private BaseDataBlock.Type _type;
+
+ private List<Object[]> _container;
+
+ public TransferableBlock(List<Object[]> container, DataSchema dataSchema,
BaseDataBlock.Type containerType) {
+ _container = container;
+ _dataSchema = dataSchema;
+ _type = containerType;
+ }
public TransferableBlock(BaseDataBlock dataBlock) {
_dataBlock = dataBlock;
+ _dataSchema = dataBlock.getDataSchema();
_type = dataBlock instanceof ColumnarDataBlock ?
BaseDataBlock.Type.COLUMNAR
: dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW :
BaseDataBlock.Type.METADATA;
}
+ public DataSchema getDataSchema() {
+ return _dataSchema;
+ }
+
+ public List<Object[]> getContainer() {
+ if (_container == null) {
+ switch (_type) {
+ case ROW:
+ _container = DataBlockUtils.extraRows(_dataBlock);
+ break;
+ case COLUMNAR:
+ default:
+ throw new UnsupportedOperationException("Unable to extract from
container with type: " + _type);
+ }
+ }
+ return _container;
+ }
+
public BaseDataBlock getDataBlock() {
+ if (_dataBlock == null) {
+ try {
+ switch (_type) {
+ case ROW:
+ _dataBlock = DataBlockBuilder.buildFromRows(_container, null,
_dataSchema);
+ break;
+ case COLUMNAR:
+ _dataBlock = DataBlockBuilder.buildFromColumns(_container, null,
_dataSchema);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unable to build from
container with type: " + _type);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to create DataBlock");
+ }
+ }
return _dataBlock;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5bbe81428f..839b3a901c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -26,10 +26,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -85,7 +82,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
protected TransferableBlock getNextBlock() {
buildBroadcastHashTable();
try {
- return new
TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
+ return buildJoinedDataBlock(_leftTableOperator.nextBlock());
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
@@ -95,15 +92,13 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
if (!_isHashTableBuilt) {
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
- BaseDataBlock dataBlock = rightBlock.getDataBlock();
- _rightTableSchema = dataBlock.getDataSchema();
- int numRows = dataBlock.getNumberOfRows();
+ _rightTableSchema = rightBlock.getDataSchema();
+ List<Object[]> container = rightBlock.getContainer();
// put all the rows into corresponding hash collections keyed by the
key selector function.
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] objects =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ for (Object[] row : container) {
List<Object[]> hashCollection =
-
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new
ArrayList<>());
- hashCollection.add(objects);
+
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(row), k -> new
ArrayList<>());
+ hashCollection.add(row);
}
rightBlock = _rightTableOperator.nextBlock();
}
@@ -111,25 +106,23 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
}
}
- private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
+ private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
throws Exception {
- if (TransferableBlockUtils.isEndOfStream(block)) {
- return DataBlockUtils.getEndOfStreamDataBlock();
+ if (TransferableBlockUtils.isEndOfStream(leftBlock)) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
List<Object[]> rows = new ArrayList<>();
- BaseDataBlock dataBlock = block.getDataBlock();
- _leftTableSchema = dataBlock.getDataSchema();
+ _leftTableSchema = leftBlock.getDataSchema();
_resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
- int numRows = dataBlock.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] leftRow =
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ List<Object[]> container = leftBlock.getContainer();
+ for (Object[] leftRow : container) {
List<Object[]> hashCollection =
_broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow),
Collections.emptyList());
for (Object[] rightRow : hashCollection) {
rows.add(joinRow(leftRow, rightRow));
}
}
- return DataBlockBuilder.buildFromRows(rows, null, computeSchema());
+ return new TransferableBlock(rows, computeSchema(),
BaseDataBlock.Type.ROW);
}
private Object[] joinRow(Object[] leftRow, Object[] rightRow) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]