kgyrtkirk commented on code in PR #16619:
URL: https://github.com/apache/druid/pull/16619#discussion_r1644503397
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -157,10 +161,35 @@ private static class ScanResultValueFramesIterator
implements Iterator<FrameSign
Cursor currentCursor = null;
/**
- * Row signature of the current row
+ * Rows in the List form. The {@link #currentCursor} is a wrapper over
these rows
+ */
+ List<Object[]> currentRows = null;
+
+ /**
+ * Row index pointing to the current row in {@link #currentRows}. This is
the exact same row that the {@link #currentCursor}
+ * is also pointing at. Therefore {@link #currentRows} + {@link
#currentCursor} represent the same information as presented
+ * by {@link #currentCursor}.
+ */
+ int currentRowIndex = -1;
+
+ /**
+ * Row signature of the current cursor. This is used to create the cursor
out of the ScanResultValue. We have to use
+ * the full signature because the ScanResultValue will have
*/
RowSignature currentRowSignature = null;
+ /**
+ * Row signature of the current cursor, with columns having unknown (null)
types trimmed out. This is used to write
+ * the rows onto the frame. There's an implicit assumption (that we
verify), that columns with null typed only
+ * contain null values, because the underlying segment didn't have the
column.
+ */
+ RowSignature trimmedRowSignature = null;
Review Comment:
I feel like the name `outputRowSignature` or `writtenRowSignature` would
describe better this field.
similarily `currentRowSignature` could be `inputRowSignature` - that way
these fields don't even need that much apidoc....input/output can be different
:D
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -200,26 +229,33 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
- // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore we store the signature
+ // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore, we store the signature
// with which we have written the frames
- final RowSignature writtenSignature = currentRowSignature;
+ final RowSignature writtenSignature = trimmedRowSignature;
Review Comment:
instead of introducing this variable why not use
`frameWriterFactory.signature()` ?
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -200,26 +229,33 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
- // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore we store the signature
+ // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore, we store the signature
Review Comment:
....what if the signature changes - is that a problem? shouldn't that be an
Exception?
##########
processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java:
##########
@@ -267,4 +372,37 @@ private static ScanResultValue scanResultValue2(int
numRows)
SIGNATURE2
);
}
+
+ // Signature: col1: DOUBLE, col2: LONG, col3: null
+ private static ScanResultValue scanResultValue3(int numRows)
+ {
+ return new ScanResultValue(
+ "dummy",
+ ImmutableList.of("col1", "col2", "col3"),
+ IntStream.range(5, 5 + numRows).mapToObj(i -> new Object[]{(double) i,
i, null}).collect(Collectors.toList()),
+ SIGNATURE3
+ );
+ }
+
+ // Signature: col1: DOUBLE, col3: null, col2: LONG
+ private static ScanResultValue scanResultValue4(int numRows)
+ {
+ return new ScanResultValue(
+ "dummy",
+ ImmutableList.of("col1", "col3", "col2"),
+ IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i,
null, i}).collect(Collectors.toList()),
+ SIGNATURE4
+ );
+ }
+
+ // Contains ScanResultValue with incomplete type, and non-null row
+ private static ScanResultValue incompleteTypeScanResultValue(int numRows)
+ {
+ return new ScanResultValue(
+ "dummy",
+ ImmutableList.of("col1", "col3", "col2"),
+ IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i,
i, i}).collect(Collectors.toList()),
Review Comment:
all these methods seem to return rows which are not that fragile for column
order;
can we have some diversity in the row; instead of `(1,1,1)`
maybe: `(i, 10+i , 20+i)` or something
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -200,26 +229,33 @@ public FrameSignaturePair next()
// start all the processing
populateCursor();
boolean firstRowWritten = false;
- // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore we store the signature
+ // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore, we store the signature
// with which we have written the frames
- final RowSignature writtenSignature = currentRowSignature;
+ final RowSignature writtenSignature = trimmedRowSignature;
FrameWriterFactory frameWriterFactory =
FrameWriters.makeFrameWriterFactory(
FrameType.COLUMNAR,
memoryAllocatorFactory,
- currentRowSignature,
+ trimmedRowSignature,
Collections.emptyList()
);
Frame frame;
- try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
- () -> currentCursor,
- currentRowSignature
- ))) {
+ try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(
+ new SettableCursorColumnSelectorFactory(() -> currentCursor,
currentRowSignature))) {
while (populateCursor()) { // Do till we don't have any more rows, or
the next row isn't compatible with the current row
if (!frameWriter.addSelection()) { // Add the cursor's row to the
frame, till the frame is full
break;
}
+
+ for (Integer columnNumber : nullTypedColumns) {
Review Comment:
note: I wonder why use a fastutil `IntList` - if it gets iterated with a
`foreach` ; plain `get`?
this could be moved into some method like `validateRow` - that will
naturally do a CSE of the `currentRows.get(currentRowIndex)` so that it will be
only evaluated once
##########
processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java:
##########
@@ -222,6 +262,71 @@ public void
testBatchingWithHeterogenousAndEmptyScanResultValues()
);
}
+ @Test
+ public void
testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes()
+ {
+ List<FrameSignaturePair> frames = Lists.newArrayList(
+ createIterable(
+ scanResultValue1(0),
+ scanResultValue2(0),
+ scanResultValue1(2),
+ scanResultValue1(0),
+ scanResultValue2(2),
+ scanResultValue2(0),
+ scanResultValue2(0)
+ )
+ );
+ Assert.assertEquals(2, frames.size());
+ QueryToolChestTestHelper.assertArrayResultsEquals(
+ ImmutableList.of(
+ new Object[]{1L, 1.0D},
+ new Object[]{2L, 2.0D}
+ ),
+ new
FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)),
SIGNATURE1).getRowsAsSequence()
+ );
+ QueryToolChestTestHelper.assertArrayResultsEquals(
+ ImmutableList.of(
+ new Object[]{3.0D, 3L},
+ new Object[]{4.0D, 4L}
+ ),
+ new
FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)),
SIGNATURE2).getRowsAsSequence()
+ );
+ }
+
+ @Test
+ public void
testBatchingWithDifferentRowSignaturesButSameTrimmedRowSignature()
+ {
+ List<FrameSignaturePair> frames = Lists.newArrayList(
+ createIterable(
+ scanResultValue3(0),
+ scanResultValue4(0),
+ scanResultValue3(2),
+ scanResultValue3(0),
+ scanResultValue4(2),
+ scanResultValue4(0),
+ scanResultValue3(0)
+ )
+ );
+ Assert.assertEquals(1, frames.size());
+ QueryToolChestTestHelper.assertArrayResultsEquals(
+ ImmutableList.of(
+ new Object[]{5.0D, 5L},
+ new Object[]{6.0D, 6L},
+ new Object[]{7.0D, 7L},
+ new Object[]{8.0D, 8L}
+ ),
+ new
FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)),
SIGNATURE2).getRowsAsSequence()
Review Comment:
its very odd to see `Assert.assertEquals(1, frames.size())` and then after a
few lines later `Collections.singletonList(frames.get(0))` - why not just
`frames` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]