This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fae135e33f5 Be sure block serialization can be interrupted (#16595)
fae135e33f5 is described below
commit fae135e33f5e57564ecfc6c3ffc13b373f7daf0a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 08:58:18 2025 +0200
Be sure block serialization can be interrupted (#16595)
---
.../core/common/datablock/DataBlockBuilder.java | 440 ++++++++++++---------
.../runtime/operator/MailboxSendOperator.java | 2 +-
.../query/runtime/operator/MultiStageOperator.java | 1 +
3 files changed, 247 insertions(+), 196 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 3ade560b0b3..1aaebb5625e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -91,93 +91,95 @@ public class DataBlockBuilder {
PagedPinotOutputStream varSize = new PagedPinotOutputStream(allocator);
Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] row = rows.get(rowId);
- for (int colId = 0; colId < numColumns; colId++) {
- Object value = row[colId];
- ColumnDataType storedType = storedTypes[colId];
+ interruptableLoop(0, numRows, 1000, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object[] row = rows.get(rowId);
+ for (int colId = 0; colId < numColumns; colId++) {
+ Object value = row[colId];
+ ColumnDataType storedType = storedTypes[colId];
+
+ if (storedType == ColumnDataType.OBJECT) {
+ // Custom intermediate result for aggregation function
+ assert aggFunctions != null;
+ if (value == null) {
+ setNull(fixedSize, varSize);
+ } else {
+ // NOTE: The first (numColumns - numAggFunctions) columns are
key columns
+ int numAggFunctions = aggFunctions.length;
+ AggregationFunction aggFunction = aggFunctions[colId +
numAggFunctions - numColumns];
+ setColumn(fixedSize, varSize,
aggFunction.serializeIntermediateResult(value));
+ }
+ continue;
+ }
- if (storedType == ColumnDataType.OBJECT) {
- // Custom intermediate result for aggregation function
- assert aggFunctions != null;
if (value == null) {
- setNull(fixedSize, varSize);
- } else {
- // NOTE: The first (numColumns - numAggFunctions) columns are key
columns
- int numAggFunctions = aggFunctions.length;
- AggregationFunction aggFunction = aggFunctions[colId +
numAggFunctions - numColumns];
- setColumn(fixedSize, varSize,
aggFunction.serializeIntermediateResult(value));
+ if (storedType == ColumnDataType.UNKNOWN) {
+ setNull(fixedSize, varSize);
+ continue;
+ } else {
+ nullBitmaps[colId].add(rowId);
+ value = nullPlaceholders[colId];
+ }
}
- continue;
- }
- if (value == null) {
- if (storedType == ColumnDataType.UNKNOWN) {
- setNull(fixedSize, varSize);
- continue;
- } else {
- nullBitmaps[colId].add(rowId);
- value = nullPlaceholders[colId];
+ // NOTE:
+ // We intentionally make the type casting very strict here (e.g.
only accepting Integer for INT) to ensure the
+ // rows conform to the data schema. This can help catch the
unexpected data type issues early.
+ switch (storedType) {
+ // Single-value column
+ case INT:
+ fixedSize.putInt((int) value);
+ break;
+ case LONG:
+ fixedSize.putLong((long) value);
+ break;
+ case FLOAT:
+ fixedSize.putFloat((float) value);
+ break;
+ case DOUBLE:
+ fixedSize.putDouble((double) value);
+ break;
+ case BIG_DECIMAL:
+ setColumn(fixedSize, varSize, (BigDecimal) value);
+ break;
+ case STRING:
+ int dictId = dictionary.computeIfAbsent((String) value, k ->
dictionary.size());
+ fixedSize.putInt(dictId);
+ break;
+ case BYTES:
+ setColumn(fixedSize, varSize, (ByteArray) value);
+ break;
+ case MAP:
+ setColumn(fixedSize, varSize, (Map) value);
+ break;
+ // Multi-value column
+ case INT_ARRAY:
+ setColumn(fixedSize, varSize, (int[]) value);
+ break;
+ case LONG_ARRAY:
+ setColumn(fixedSize, varSize, (long[]) value);
+ break;
+ case FLOAT_ARRAY:
+ setColumn(fixedSize, varSize, (float[]) value);
+ break;
+ case DOUBLE_ARRAY:
+ setColumn(fixedSize, varSize, (double[]) value);
+ break;
+ case STRING_ARRAY:
+ setColumn(fixedSize, varSize, (String[]) value, dictionary);
+ break;
+ // Null
+ case UNKNOWN:
+ setNull(fixedSize, varSize);
+ break;
+
+ default:
+ throw new IllegalStateException(
+ "Unsupported stored type: " + storedType + " for column: " +
dataSchema.getColumnName(colId));
}
}
-
- // NOTE:
- // We intentionally make the type casting very strict here (e.g. only
accepting Integer for INT) to ensure the
- // rows conform to the data schema. This can help catch the unexpected
data type issues early.
- switch (storedType) {
- // Single-value column
- case INT:
- fixedSize.putInt((int) value);
- break;
- case LONG:
- fixedSize.putLong((long) value);
- break;
- case FLOAT:
- fixedSize.putFloat((float) value);
- break;
- case DOUBLE:
- fixedSize.putDouble((double) value);
- break;
- case BIG_DECIMAL:
- setColumn(fixedSize, varSize, (BigDecimal) value);
- break;
- case STRING:
- int dictId = dictionary.computeIfAbsent((String) value, k ->
dictionary.size());
- fixedSize.putInt(dictId);
- break;
- case BYTES:
- setColumn(fixedSize, varSize, (ByteArray) value);
- break;
- case MAP:
- setColumn(fixedSize, varSize, (Map) value);
- break;
- // Multi-value column
- case INT_ARRAY:
- setColumn(fixedSize, varSize, (int[]) value);
- break;
- case LONG_ARRAY:
- setColumn(fixedSize, varSize, (long[]) value);
- break;
- case FLOAT_ARRAY:
- setColumn(fixedSize, varSize, (float[]) value);
- break;
- case DOUBLE_ARRAY:
- setColumn(fixedSize, varSize, (double[]) value);
- break;
- case STRING_ARRAY:
- setColumn(fixedSize, varSize, (String[]) value, dictionary);
- break;
- // Null
- case UNKNOWN:
- setNull(fixedSize, varSize);
- break;
-
- default:
- throw new IllegalStateException(
- "Unsupported stored type: " + storedType + " for column: " +
dataSchema.getColumnName(colId));
- }
}
- }
+ });
CompoundDataBuffer.Builder varBufferBuilder =
new CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN,
true).addPagedOutputStream(varSize);
@@ -250,6 +252,7 @@ public class DataBlockBuilder {
int numRows = columns.get(colId).length;
Object[] column = columns.get(colId);
+ int interruptableLoopStep = 10000;
// NOTE:
// We intentionally make the type casting very strict here (e.g. only
accepting Integer for INT) to ensure the
@@ -258,195 +261,225 @@ public class DataBlockBuilder {
// Single-value column
case INT: {
int nullPlaceholder = (int) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- fixedSize.putInt(nullPlaceholder);
- } else {
- fixedSize.putInt((int) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putInt(nullPlaceholder);
+ } else {
+ fixedSize.putInt((int) value);
+ }
}
- }
+ });
break;
}
case LONG: {
long nullPlaceholder = (long) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- fixedSize.putLong(nullPlaceholder);
- } else {
- fixedSize.putLong((long) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putLong(nullPlaceholder);
+ } else {
+ fixedSize.putLong((long) value);
+ }
}
- }
+ });
break;
}
case FLOAT: {
float nullPlaceholder = (float) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- fixedSize.putFloat(nullPlaceholder);
- } else {
- fixedSize.putFloat((float) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putFloat(nullPlaceholder);
+ } else {
+ fixedSize.putFloat((float) value);
+ }
}
- }
+ });
break;
}
case DOUBLE: {
double nullPlaceholder = (double) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- fixedSize.putDouble(nullPlaceholder);
- } else {
- fixedSize.putDouble((double) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putDouble(nullPlaceholder);
+ } else {
+ fixedSize.putDouble((double) value);
+ }
}
- }
+ });
break;
}
case BIG_DECIMAL: {
BigDecimal nullPlaceholder = (BigDecimal)
storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (BigDecimal) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = start; rowId < end; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (BigDecimal) value);
+ }
}
- }
+ });
break;
}
case STRING: {
ToIntFunction<String> didSupplier = k -> dictionary.size();
int nullPlaceHolder = dictionary.computeIfAbsent((String)
storedType.getNullPlaceholder(), didSupplier);
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- fixedSize.putInt(nullPlaceHolder);
- } else {
- int dictId = dictionary.computeIfAbsent((String) value,
didSupplier);
- fixedSize.putInt(dictId);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ fixedSize.putInt(nullPlaceHolder);
+ } else {
+ int dictId = dictionary.computeIfAbsent((String) value,
didSupplier);
+ fixedSize.putInt(dictId);
+ }
}
- }
+ });
break;
}
case BYTES: {
ByteArray nullPlaceholder = (ByteArray)
storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (ByteArray) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (ByteArray) value);
+ }
}
- }
+ });
break;
}
case MAP: {
Map nullPlaceholder = (Map) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (Map) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (Map) value);
+ }
}
- }
+ });
break;
}
// Multi-value column
case INT_ARRAY: {
int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (int[]) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (int[]) value);
+ }
}
- }
+ });
break;
}
case LONG_ARRAY: {
long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (long[]) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (long[]) value);
+ }
}
- }
+ });
break;
}
case FLOAT_ARRAY: {
float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (float[]) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (float[]) value);
+ }
}
- }
+ });
break;
}
case DOUBLE_ARRAY: {
double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder);
- } else {
- setColumn(fixedSize, varSize, (double[]) value);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder);
+ } else {
+ setColumn(fixedSize, varSize, (double[]) value);
+ }
}
- }
+ });
break;
}
case STRING_ARRAY: {
String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- nullBitmap.add(rowId);
- setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
- } else {
- setColumn(fixedSize, varSize, (String[]) value, dictionary);
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ nullBitmap.add(rowId);
+ setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+ } else {
+ setColumn(fixedSize, varSize, (String[]) value, dictionary);
+ }
}
- }
+ });
break;
}
// Custom intermediate result for aggregation function
case OBJECT: {
assert aggFunction != null;
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object value = column[rowId];
- if (value == null) {
- setNull(fixedSize, varSize);
- } else {
- setColumn(fixedSize, varSize,
aggFunction.serializeIntermediateResult(value));
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object value = column[rowId];
+ if (value == null) {
+ setNull(fixedSize, varSize);
+ } else {
+ setColumn(fixedSize, varSize,
aggFunction.serializeIntermediateResult(value));
+ }
}
- }
+ });
break;
}
// Null
case UNKNOWN:
- for (int rowId = 0; rowId < numRows; rowId++) {
- setNull(fixedSize, varSize);
- }
+ interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ setNull(fixedSize, varSize);
+ }
+ });
break;
default:
@@ -622,4 +655,21 @@ public class DataBlockBuilder {
fixedSize.putInt(0);
varSize.writeInt(CustomObject.NULL_TYPE_VALUE);
}
+
+ /// Iterate using two loops.
+ /// The outer loop will iterate over a maximum of configurable step rows and
check for the interruption flag,
+ /// calling the inner loop to process the rows without checking the
interruption flag.
+ static void interruptableLoop(int start, int max, int step, InnerLoop loop)
throws IOException {
+ for (int i = start; i < max; i += step) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new RuntimeException("Thread interrupted while processing rows.
Rows processed so far: " + i);
+ }
+ int end = Math.min(i + step, max);
+ loop.run(i, end);
+ }
+ }
+
+ private interface InnerLoop {
+ void run(int from, int to) throws IOException;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e2a1009d53f..c1a0fcf6e95 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -225,7 +225,7 @@ public class MailboxSendOperator extends MultiStageOperator
{
} catch (Exception e) {
ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
try {
- LOGGER.error("Exception while transferring data on opChain: {}",
_context.getId());
+ LOGGER.error("Exception while transferring data on opChain: {}",
_context.getId(), e);
sendEos(errorBlock);
} catch (Exception e2) {
LOGGER.error("Exception while sending error block.", e2);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 013aa705331..abd61d2ea81 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -125,6 +125,7 @@ public abstract class MultiStageOperator
try {
nextBlock = getNextBlock();
} catch (Exception e) {
+ logger().warn("Operator {}: Exception while processing next block",
_operatorId, e);
nextBlock = ErrorMseBlock.fromException(e);
}
int numRows = nextBlock instanceof MseBlock.Data ? ((MseBlock.Data)
nextBlock).getNumRows() : 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]