gortiz commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1738697999
##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -187,200 +147,321 @@ public static RowDataBlock buildFromRows(List<Object[]>
rows, DataSchema dataSch
dataSchema.getColumnName(colId)));
}
}
- rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(),
0, byteBuffer.position());
}
+
+ CompoundDataBuffer.Builder varBufferBuilder = new
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true)
+ .addPagedOutputStream(varSize);
+
// Write null bitmaps after writing data.
- for (RoaringBitmap nullBitmap : nullBitmaps) {
- rowBuilder.setNullRowIds(nullBitmap);
- }
- return buildRowBlock(rowBuilder);
+ setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+ return buildRowBlock(numRows, dataSchema,
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
}
+
public static ColumnarDataBlock buildFromColumns(List<Object[]> columns,
DataSchema dataSchema)
throws IOException {
+ return buildFromColumns(columns, dataSchema,
PagedPinotOutputStream.HeapPageAllocator.createSmall());
+ }
+
+ public static ColumnarDataBlock buildFromColumns(List<Object[]> columns,
DataSchema dataSchema,
+ PagedPinotOutputStream.PageAllocator allocator)
+ throws IOException {
int numRows = columns.isEmpty() ? 0 : columns.get(0).length;
- DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema,
DataBlock.Type.COLUMNAR, numRows);
+
+ int fixedBytesPerRow = calculateBytesPerRow(dataSchema);
+ int nullFixedBytes = dataSchema.size() * Integer.BYTES * 2;
+ int fixedBytesRequired = fixedBytesPerRow * numRows + nullFixedBytes;
+
+ Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
+
// TODO: consolidate these null utils into data table utils.
// Selection / Agg / Distinct all have similar code.
- ColumnDataType[] storedTypes = dataSchema.getStoredColumnDataTypes();
- int numColumns = storedTypes.length;
+ int numColumns = dataSchema.size();
+
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
- Object[] nullPlaceholders = new Object[numColumns];
- for (int colId = 0; colId < numColumns; colId++) {
- nullBitmaps[colId] = new RoaringBitmap();
- nullPlaceholders[colId] = storedTypes[colId].getNullPlaceholder();
+ ByteBuffer fixedSize = ByteBuffer.allocate(fixedBytesRequired);
+ CompoundDataBuffer.Builder varBufferBuilder = new
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true);
+
+ try (PagedPinotOutputStream varSize = new
PagedPinotOutputStream(allocator)) {
+ for (int colId = 0; colId < numColumns; colId++) {
+ RoaringBitmap nullBitmap = new RoaringBitmap();
+ nullBitmaps[colId] = nullBitmap;
+ serializeColumnData(columns, dataSchema, colId, fixedSize, varSize,
nullBitmap, dictionary);
+ }
+ varBufferBuilder.addPagedOutputStream(varSize);
}
- for (int colId = 0; colId < numColumns; colId++) {
- Object[] column = columns.get(colId);
- ByteBuffer byteBuffer = ByteBuffer.allocate(numRows *
columnarBuilder._columnSizeInBytes[colId]);
- Object value;
-
- // NOTE:
- // We intentionally make the type casting very strict here (e.g. only
accepting Integer for INT) to ensure the
- // rows conform to the data schema. This can help catch the unexpected
data type issues early.
- switch (storedTypes[colId]) {
- // Single-value column
- case INT:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- byteBuffer.putInt((int) value);
+ // Write null bitmaps after writing data.
+ setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+ return buildColumnarBlock(numRows, dataSchema,
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
+ }
+
+ private static void serializeColumnData(List<Object[]> columns, DataSchema
dataSchema, int colId,
+ ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap
nullBitmap,
+ Object2IntOpenHashMap<String> dictionary)
+ throws IOException {
+ ColumnDataType storedType =
dataSchema.getColumnDataType(colId).getStoredType();
+ int numRows = columns.get(colId).length;
+
+ Object[] column = columns.get(colId);
+
+ // NOTE:
+ // We intentionally make the type casting very strict here (e.g. only
accepting Integer for INT) to ensure the
+ // rows conform to the data schema. This can help catch the unexpected
data type issues early.
+ switch (storedType) {
+ // Single-value column
+ case INT: {
+ int nullPlaceholder = (int) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putInt(nullPlaceholder);
+ } else {
+ fixedSize.putInt((int) value);
}
- break;
- case LONG:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- byteBuffer.putLong((long) value);
+ }
+ break;
+ }
+ case LONG: {
+ long nullPlaceholder = (long) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putLong(nullPlaceholder);
+ } else {
+ fixedSize.putLong((long) value);
}
- break;
- case FLOAT:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- byteBuffer.putFloat((float) value);
+ }
+ break;
+ }
+ case FLOAT: {
+ float nullPlaceholder = (float) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putFloat(nullPlaceholder);
+ } else {
+ fixedSize.putFloat((float) value);
}
- break;
- case DOUBLE:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- byteBuffer.putDouble((double) value);
+ }
+ break;
+ }
+ case DOUBLE: {
+ double nullPlaceholder = (double) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putDouble(nullPlaceholder);
+ } else {
+ fixedSize.putDouble((double) value);
}
- break;
- case BIG_DECIMAL:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+ }
+ break;
+ }
+ case BIG_DECIMAL: {
+ BigDecimal nullPlaceholder = (BigDecimal)
storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (BigDecimal) value);
}
- break;
- case STRING:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (String) value);
+ }
+ break;
+ }
+ case STRING: {
+ ToIntFunction<String> didSupplier = k -> dictionary.size();
+ int nullPlaceHolder = dictionary.computeIfAbsent((String)
storedType.getNullPlaceholder(), didSupplier);
+
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putInt(nullPlaceHolder);
+ } else {
+ int dictId = dictionary.computeIfAbsent((String) value,
didSupplier);
+ fixedSize.putInt(dictId);
}
- break;
- case BYTES:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+ }
+ break;
+ }
+ case BYTES: {
+ ByteArray nullPlaceholder = (ByteArray)
storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (ByteArray) value);
}
- break;
+ }
+ break;
+ }
- // Multi-value column
- case INT_ARRAY:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (int[]) value);
+ // Multi-value column
+ case INT_ARRAY: {
+ int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (int[]) value);
}
- break;
- case LONG_ARRAY:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (long[]) value);
+ }
+ break;
+ }
+ case LONG_ARRAY: {
+ long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (long[]) value);
}
- break;
- case FLOAT_ARRAY:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (float[]) value);
+ }
+ break;
+ }
+ case FLOAT_ARRAY: {
+ float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (float[]) value);
}
- break;
- case DOUBLE_ARRAY:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (double[]) value);
+ }
+ break;
+ }
+ case DOUBLE_ARRAY: {
+ double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (double[]) value);
}
- break;
- case STRING_ARRAY:
- for (int rowId = 0; rowId < numRows; rowId++) {
- value = column[rowId];
- if (value == null) {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
- }
- setColumn(columnarBuilder, byteBuffer, (String[]) value);
+ }
+ break;
+ }
+ case STRING_ARRAY: {
+ String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+ } else {
+ setColumn(fixedSize, varSize, (String[]) value, dictionary);
}
- break;
+ }
+ break;
+ }
- // Special intermediate result for aggregation function
- case OBJECT:
- for (int rowId = 0; rowId < numRows; rowId++) {
- setColumn(columnarBuilder, byteBuffer, column[rowId]);
- }
- break;
+ // Special intermediate result for aggregation function
+ case OBJECT: {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ setColumn(fixedSize, varSize, column[rowId]);
+ }
+ break;
+ }
+ // Null
+ case UNKNOWN:
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ setColumn(fixedSize, varSize, (Object) null);
+ }
+ break;
- // Null
- case UNKNOWN:
- for (int rowId = 0; rowId < numRows; rowId++) {
- setColumn(columnarBuilder, byteBuffer, (Object) null);
- }
- break;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported stored type: %s for column: %s",
storedType,
+ dataSchema.getColumnName(colId)));
+ }
+ }
+ private static int calculateBytesPerRow(DataSchema dataSchema) {
+ int rowSizeInBytes = 0;
+ for (ColumnDataType columnDataType : dataSchema.getColumnDataTypes()) {
+ switch (columnDataType) {
+ case INT:
+ rowSizeInBytes += 4;
+ break;
+ case LONG:
+ rowSizeInBytes += 8;
+ break;
+ case FLOAT:
+ rowSizeInBytes += 4;
+ break;
+ case DOUBLE:
+ rowSizeInBytes += 8;
+ break;
+ case STRING:
+ rowSizeInBytes += 4;
+ break;
+ // Object and array. (POSITION|LENGTH)
default:
- throw new IllegalStateException(
- String.format("Unsupported stored type: %s for column: %s",
storedTypes[colId],
- dataSchema.getColumnName(colId)));
+ rowSizeInBytes += 8;
+ break;
}
-
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(),
0, byteBuffer.position());
}
- // Write null bitmaps after writing data.
- for (RoaringBitmap nullBitmap : nullBitmaps) {
- columnarBuilder.setNullRowIds(nullBitmap);
+ return rowSizeInBytes;
+ }
+
+ private static void writeVarOffsetInFixed(ByteBuffer fixedSize,
PagedPinotOutputStream varSize) {
+ long offsetInVar = varSize.getCurrentOffset();
+ Preconditions.checkState(offsetInVar <= Integer.MAX_VALUE,
+ "Cannot handle variable size output stream larger than 2GB");
+ fixedSize.putInt((int) offsetInVar);
+ }
+
+ private static void setNullRowIds(RoaringBitmap[] nullVectors, ByteBuffer
fixedSize,
+ CompoundDataBuffer.Builder varBufferBuilder)
+ throws IOException {
+ int varBufSize = Arrays.stream(nullVectors)
+ .mapToInt(bitmap -> bitmap == null ? 0 :
bitmap.serializedSizeInBytes())
+ .sum();
+ ByteBuffer variableSize = ByteBuffer.allocate(varBufSize)
+ .order(ByteOrder.BIG_ENDIAN);
+
+ long varWrittenBytes = varBufferBuilder.getWrittenBytes();
+ Preconditions.checkArgument(varWrittenBytes < Integer.MAX_VALUE,
+ "Cannot handle variable size output stream larger than 2GB but found
{} written bytes", varWrittenBytes);
+ int startVariableOffset = (int) varWrittenBytes;
+ for (RoaringBitmap nullRowIds : nullVectors) {
+ int writtenVarBytes = variableSize.position();
+ fixedSize.putInt(startVariableOffset + writtenVarBytes);
+ if (nullRowIds == null || nullRowIds.isEmpty()) {
+ fixedSize.putInt(0);
+ } else {
+ RoaringBitmapUtils.serialize(nullRowIds, variableSize);
+ fixedSize.putInt(variableSize.position() - writtenVarBytes);
+ }
}
- return buildColumnarBlock(columnarBuilder);
+ varBufferBuilder.addBuffer(variableSize);
}
- private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
- return new RowDataBlock(builder._numRows, builder._dataSchema,
getReverseDictionary(builder._dictionary),
- builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
- builder._variableSizeDataByteArrayOutputStream.toByteArray());
+ private static RowDataBlock buildRowBlock(int numRows, DataSchema
dataSchema, String[] dictionary,
+ ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+ return new RowDataBlock(numRows, dataSchema, dictionary,
PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
}
- private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder
builder) {
- return new ColumnarDataBlock(builder._numRows, builder._dataSchema,
getReverseDictionary(builder._dictionary),
- builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
- builder._variableSizeDataByteArrayOutputStream.toByteArray());
+ private static ColumnarDataBlock buildColumnarBlock(int numRows, DataSchema
dataSchema, String[] dictionary,
+ ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+ return new ColumnarDataBlock(numRows, dataSchema, dictionary,
+ PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
Review Comment:
What do you mean? Just to write it like
```
return new ColumnarDataBlock(numRows, dataSchema, dictionary,
PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
```
In that case we cannot do that because that line is 126 chars long
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]