walterddr commented on code in PR #9886:
URL: https://github.com/apache/pinot/pull/9886#discussion_r1037667080
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -116,23 +128,107 @@ private static Object[] canonicalizeRow(Object[] row,
DataSchema dataSchema) {
return resultRow;
}
- private static List<Object[]> toList(Collection<Object[]> collection,
DataSchema dataSchema) {
- if (collection == null || collection.isEmpty()) {
- return new ArrayList<>();
- }
- List<Object[]> resultRows = new ArrayList<>(collection.size());
- if (collection instanceof List) {
- for (Object[] orgRow : collection) {
- resultRows.add(canonicalizeRow(orgRow, dataSchema));
- }
- } else if (collection instanceof PriorityQueue) {
- PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>)
collection;
- while (!priorityQueue.isEmpty()) {
- resultRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+ /**
+ * we re-arrange columns to match the projection in the case of order by -
this is to ensure
+ * that V1 results match what the expected projection schema in the calcite
logical operator; if
+ * we realize that there are other situations where we need to post-process
v1 results to adhere to
+ * the expected results we should factor this out and also apply the
canonicalization of the data
+ * types during this post-process step (also see
LeafStageTransferableBlockOperator#canonicalizeRow)
+ *
+ * @param serverResultsBlock result block from leaf stage
+ * @param dataSchema the desired schema for send operator
+ * @return conformed collection of rows.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private static List<Object[]> cleanUpDataBlock(InstanceResponseBlock
serverResultsBlock, DataSchema dataSchema,
+ boolean requiresCleanUp) {
+ // Extract the result rows
+ Collection<Object[]> resultRows = serverResultsBlock.getRows();
+ List<Object[]> extractedRows = new ArrayList<>(resultRows.size());
+ if (requiresCleanUp) {
+ DataSchema resultSchema = serverResultsBlock.getDataSchema();
+ List<String> selectionColumns =
+
SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(),
resultSchema);
+ int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+ DataSchema adjustedDataSchema =
SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+
Preconditions.checkState(isDataSchemaColumnTypesCompatible(dataSchema.getColumnDataTypes(),
+ adjustedDataSchema.getColumnDataTypes()),
+ "Incompatible result data schema: " + "Expecting: " + dataSchema + "
Actual: " + adjustedDataSchema);
+ int numColumns = columnIndices.length;
+
+ if (serverResultsBlock.getQueryContext().getOrderByExpressions() !=
null) {
+ // extract result row in ordered fashion
+ PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>)
resultRows;
+ while (!priorityQueue.isEmpty()) {
+ Object[] row = priorityQueue.poll();
+ assert row != null;
+ Object[] extractedRow = new Object[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ Object value = row[columnIndices[colId]];
+ if (value != null) {
+ extractedRow[colId] =
dataSchema.getColumnDataType(colId).convert(value);
+ }
+ }
+ extractedRows.add(extractedRow);
+ }
+ } else {
+ // extract result row in non-ordered fashion
+ for (Object[] row : resultRows) {
+ assert row != null;
+ Object[] extractedRow = new Object[numColumns];
+ for (int colId = 0; colId < numColumns; colId++) {
+ Object value = row[columnIndices[colId]];
+ if (value != null) {
+ extractedRow[colId] =
dataSchema.getColumnDataType(colId).convert(value);
+ }
+ }
+ extractedRows.add(extractedRow);
+ }
}
} else {
- throw new UnsupportedOperationException("Unsupported collection type: "
+ collection.getClass());
+ if (resultRows instanceof List) {
+ for (Object[] orgRow : resultRows) {
+ extractedRows.add(canonicalizeRow(orgRow, dataSchema));
+ }
+ } else if (resultRows instanceof PriorityQueue) {
+ PriorityQueue<Object[]> priorityQueue = (PriorityQueue<Object[]>)
resultRows;
+ while (!priorityQueue.isEmpty()) {
+ extractedRows.add(canonicalizeRow(priorityQueue.poll(), dataSchema));
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported collection type:
" + resultRows.getClass());
+ }
+ }
+ return extractedRows;
+ }
+
+ /**
+ * @see
LeafStageTransferableBlockOperator#cleanUpDataBlock(InstanceResponseBlock,
DataSchema, boolean)
+ */
+ @SuppressWarnings("ConstantConditions")
+ private static DataSchema cleanUpDataSchema(InstanceResponseBlock
serverResultsBlock, DataSchema desiredDataSchema) {
+ DataSchema resultSchema = serverResultsBlock.getDataSchema();
+ List<String> selectionColumns =
+
SelectionOperatorUtils.getSelectionColumns(serverResultsBlock.getQueryContext(),
resultSchema);
+
+ int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(selectionColumns, resultSchema);
+ DataSchema adjustedResultSchema =
SelectionOperatorUtils.getSchemaForProjection(resultSchema, columnIndices);
+
Preconditions.checkState(isDataSchemaColumnTypesCompatible(desiredDataSchema.getColumnDataTypes(),
+ adjustedResultSchema.getColumnDataTypes()),
+ "Incompatible result data schema: " + "Expecting: " +
desiredDataSchema + " Actual: " + adjustedResultSchema);
+ return adjustedResultSchema;
+ }
+
+ private static boolean
isDataSchemaColumnTypesCompatible(DataSchema.ColumnDataType[] desiredTypes,
+ DataSchema.ColumnDataType[] givenTypes) {
+ if (desiredTypes.length != givenTypes.length) {
+ return false;
+ }
+ for (int i = 0; i < desiredTypes.length; i++) {
+ if (desiredTypes[i] != givenTypes[i] &&
!givenTypes[i].isSuperTypeOf(desiredTypes[i])) {
Review Comment:
it is returning true when types are equal i believe, see
`DataSchemaTest#testSuperTypeCheckers`
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -84,11 +89,18 @@ protected TransferableBlock getNextBlock() {
} else {
if (_currentIndex < _baseResultBlock.size()) {
InstanceResponseBlock responseBlock =
_baseResultBlock.get(_currentIndex++);
- BaseResultsBlock resultsBlock = responseBlock.getResultsBlock();
- if (resultsBlock != null) {
- List<Object[]> rows =
- toList(resultsBlock.getRows(responseBlock.getQueryContext()),
responseBlock.getDataSchema());
- return new TransferableBlock(rows, responseBlock.getDataSchema(),
DataBlock.Type.ROW);
+ if (responseBlock.getResultsBlock() != null) {
+ DataSchema dataSchema = responseBlock.getDataSchema();
+ boolean requiresCleanup = responseBlock.getResultsBlock() instanceof
SelectionResultsBlock
Review Comment:
yes agreed with you, this is refactor out to one per baseresulttype
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]