This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f820daaa80 allow unpacked content in transferable block (#8963)
f820daaa80 is described below

commit f820daaa80c245373ac847e169624f1f7d55ae1c
Author: Rong Rong <[email protected]>
AuthorDate: Tue Jun 28 13:33:44 2022 -0700

    allow unpacked content in transferable block (#8963)
    
    * add container to the block so that extracted results doesn't need to be 
serialized
    
    * transferrable block to carry container instead of always serialized format
    
    * fix bug
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../core/common/datablock/DataBlockBuilder.java    |  9 ++-
 .../core/common/datablock/DataBlockUtils.java      | 64 ++++++++++++++++++++++
 .../pinot/core/common/datablock/DataBlockTest.java |  2 +-
 .../query/runtime/blocks/TransferableBlock.java    | 51 ++++++++++++++++-
 .../query/runtime/operator/HashJoinOperator.java   | 33 +++++------
 5 files changed, 136 insertions(+), 23 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 788b68a91d..8a47f261ff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -188,7 +188,8 @@ public class DataBlockBuilder {
     return buildRowBlock(rowBuilder);
   }
 
-  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema)
+  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
@Nullable RoaringBitmap[] colNullBitmaps,
+      DataSchema dataSchema)
       throws IOException {
     DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, 
BaseDataBlock.Type.COLUMNAR);
     for (int i = 0; i < columns.size(); i++) {
@@ -303,6 +304,12 @@ public class DataBlockBuilder {
       }
       
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
+    // Write null bitmaps after writing data.
+    if (colNullBitmaps != null) {
+      for (RoaringBitmap nullBitmap : colNullBitmaps) {
+        columnarBuilder.setNullRowIds(nullBitmap);
+      }
+    }
     return buildColumnarBlock(columnarBuilder);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index f1dedd49d0..1a26365210 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -20,6 +20,8 @@ package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -74,6 +76,68 @@ public final class DataBlockUtils {
     }
   }
 
+  public static List<Object[]> extraRows(BaseDataBlock dataBlock) {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    DataSchema.ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
+    int numRows = dataBlock.getNumberOfRows();
+    int numColumns = storedColumnDataTypes.length;
+
+    List<Object[]> rows = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; i++) {
+      Object[] row = new Object[numColumns];
+      for (int j = 0; j < numColumns; j++) {
+        switch (storedColumnDataTypes[j]) {
+          // Single-value column
+          case INT:
+            row[j] = dataBlock.getInt(i, j);
+            break;
+          case LONG:
+            row[j] = dataBlock.getLong(i, j);
+            break;
+          case FLOAT:
+            row[j] = dataBlock.getFloat(i, j);
+            break;
+          case DOUBLE:
+            row[j] = dataBlock.getDouble(i, j);
+            break;
+          case BIG_DECIMAL:
+            row[j] = dataBlock.getBigDecimal(i, j);
+            break;
+          case STRING:
+            row[j] = dataBlock.getString(i, j);
+            break;
+          case BYTES:
+            row[j] = dataBlock.getBytes(i, j);
+            break;
+
+          // Multi-value column
+          case INT_ARRAY:
+            row[j] = dataBlock.getIntArray(i, j);
+            break;
+          case LONG_ARRAY:
+            row[j] = dataBlock.getLongArray(i, j);
+            break;
+          case FLOAT_ARRAY:
+            row[j] = dataBlock.getFloatArray(i, j);
+            break;
+          case DOUBLE_ARRAY:
+            row[j] = dataBlock.getDoubleArray(i, j);
+            break;
+          case STRING_ARRAY:
+            row[j] = dataBlock.getStringArray(i, j);
+            break;
+
+          default:
+            throw new IllegalStateException(
+                String.format("Unsupported data type: %s for column: %s", 
storedColumnDataTypes[j],
+                    dataSchema.getColumnName(j)));
+        }
+      }
+      rows.add(row);
+    }
+    return rows;
+  }
+
   /**
    * Given a {@link DataSchema}, compute each column's offset and fill them 
into the passed in array, then return the
    * row size in bytes.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index b9e816374c..c9b79218b6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -110,7 +110,7 @@ public class DataBlockTest {
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, 
TEST_ROW_COUNT);
     List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, 
rows);
     RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, null, 
dataSchema);
-    ColumnarDataBlock columnarBlock = 
DataBlockBuilder.buildFromColumns(columnars, dataSchema);
+    ColumnarDataBlock columnarBlock = 
DataBlockBuilder.buildFromColumns(columnars, null, dataSchema);
 
     for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
       DataSchema.ColumnDataType columnDataType = 
dataSchema.getColumnDataType(colId);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index ae64cc90bb..79470fed8c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.query.runtime.blocks;
 
 import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -26,6 +28,8 @@ import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.common.datablock.RowDataBlock;
 
 
@@ -35,16 +39,61 @@ import org.apache.pinot.core.common.datablock.RowDataBlock;
  */
 public class TransferableBlock implements Block {
 
+  private final BaseDataBlock.Type _type;
+  private final DataSchema _dataSchema;
+
   private BaseDataBlock _dataBlock;
-  private BaseDataBlock.Type _type;
+
+  private List<Object[]> _container;
+
+  public TransferableBlock(List<Object[]> container, DataSchema dataSchema, 
BaseDataBlock.Type containerType) {
+    _container = container;
+    _dataSchema = dataSchema;
+    _type = containerType;
+  }
 
   public TransferableBlock(BaseDataBlock dataBlock) {
     _dataBlock = dataBlock;
+    _dataSchema = dataBlock.getDataSchema();
     _type = dataBlock instanceof ColumnarDataBlock ? 
BaseDataBlock.Type.COLUMNAR
         : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : 
BaseDataBlock.Type.METADATA;
   }
 
+  public DataSchema getDataSchema() {
+    return _dataSchema;
+  }
+
+  public List<Object[]> getContainer() {
+    if (_container == null) {
+      switch (_type) {
+        case ROW:
+          _container = DataBlockUtils.extraRows(_dataBlock);
+          break;
+        case COLUMNAR:
+        default:
+          throw new UnsupportedOperationException("Unable to extract from 
container with type: " + _type);
+      }
+    }
+    return _container;
+  }
+
   public BaseDataBlock getDataBlock() {
+    if (_dataBlock == null) {
+      try {
+        switch (_type) {
+          case ROW:
+            _dataBlock = DataBlockBuilder.buildFromRows(_container, null, 
_dataSchema);
+            break;
+          case COLUMNAR:
+            _dataBlock = DataBlockBuilder.buildFromColumns(_container, null, 
_dataSchema);
+            break;
+          default:
+            throw new UnsupportedOperationException("Unable to build from 
container with type: " + _type);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to create DataBlock");
+      }
+    }
     return _dataBlock;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5bbe81428f..839b3a901c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -26,10 +26,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -85,7 +82,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   protected TransferableBlock getNextBlock() {
     buildBroadcastHashTable();
     try {
-      return new 
TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
+      return buildJoinedDataBlock(_leftTableOperator.nextBlock());
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
@@ -95,15 +92,13 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     if (!_isHashTableBuilt) {
       TransferableBlock rightBlock = _rightTableOperator.nextBlock();
       while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
-        BaseDataBlock dataBlock = rightBlock.getDataBlock();
-        _rightTableSchema = dataBlock.getDataSchema();
-        int numRows = dataBlock.getNumberOfRows();
+        _rightTableSchema = rightBlock.getDataSchema();
+        List<Object[]> container = rightBlock.getContainer();
         // put all the rows into corresponding hash collections keyed by the 
key selector function.
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object[] objects = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+        for (Object[] row : container) {
           List<Object[]> hashCollection =
-              
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new 
ArrayList<>());
-          hashCollection.add(objects);
+              
_broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(row), k -> new 
ArrayList<>());
+          hashCollection.add(row);
         }
         rightBlock = _rightTableOperator.nextBlock();
       }
@@ -111,25 +106,23 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     }
   }
 
-  private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
+  private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
       throws Exception {
-    if (TransferableBlockUtils.isEndOfStream(block)) {
-      return DataBlockUtils.getEndOfStreamDataBlock();
+    if (TransferableBlockUtils.isEndOfStream(leftBlock)) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     }
     List<Object[]> rows = new ArrayList<>();
-    BaseDataBlock dataBlock = block.getDataBlock();
-    _leftTableSchema = dataBlock.getDataSchema();
+    _leftTableSchema = leftBlock.getDataSchema();
     _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size();
-    int numRows = dataBlock.getNumberOfRows();
-    for (int rowId = 0; rowId < numRows; rowId++) {
-      Object[] leftRow = 
SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+    List<Object[]> container = leftBlock.getContainer();
+    for (Object[] leftRow : container) {
       List<Object[]> hashCollection =
           _broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), 
Collections.emptyList());
       for (Object[] rightRow : hashCollection) {
         rows.add(joinRow(leftRow, rightRow));
       }
     }
-    return DataBlockBuilder.buildFromRows(rows, null, computeSchema());
+    return new TransferableBlock(rows, computeSchema(), 
BaseDataBlock.Type.ROW);
   }
 
   private Object[] joinRow(Object[] leftRow, Object[] rightRow) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to