cryptoe commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1175932725
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +206,129 @@ public RowSignature resultArraySignature(final ScanQuery
query)
}
}
+ /**
+ * This batches the fetched {@link ScanResultValue}s which have similar
signatures and are consecutives. In best case
+ * it would return a single frame, and in the worst case, it would return as
many frames as the number of {@link ScanResultValue}
+ * passed.
+ */
+ @Override
+ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
+ final ScanQuery query,
+ final Sequence<ScanResultValue> resultSequence,
+ @Nullable Long memoryLimitBytes
+ )
+ {
+ final AtomicLong memoryLimitAccumulator = memoryLimitBytes != null ? new
AtomicLong(memoryLimitBytes) : null;
+ Iterator<ScanResultValue> resultSequenceIterator = new
Iterator<ScanResultValue>()
+ {
+ Yielder<ScanResultValue> yielder = Yielders.each(resultSequence);
+
+ @Override
+ public boolean hasNext()
+ {
+ return !yielder.isDone();
+ }
+
+ @Override
+ public ScanResultValue next()
+ {
+ ScanResultValue scanResultValue = yielder.get();
+ yielder = yielder.next(null);
+ return scanResultValue;
+ }
+ };
+
+ Iterable<FrameSignaturePair> retVal = () -> new
Iterator<FrameSignaturePair>()
+ {
+ PeekingIterator<ScanResultValue> scanResultValuePeekingIterator =
Iterators.peekingIterator(resultSequenceIterator);
+
+ @Override
+ public boolean hasNext()
+ {
+ return scanResultValuePeekingIterator.hasNext();
+ }
+
+ @Override
+ public FrameSignaturePair next()
+ {
+ final List<ScanResultValue> batch = new ArrayList<>();
+ final ScanResultValue scanResultValue =
scanResultValuePeekingIterator.next();
+ batch.add(scanResultValue);
+ final RowSignature rowSignature = scanResultValue.getRowSignature();
+ while (scanResultValuePeekingIterator.hasNext()) {
+ final RowSignature nextRowSignature =
scanResultValuePeekingIterator.peek().getRowSignature();
+ if (nextRowSignature != null &&
nextRowSignature.equals(rowSignature)) {
+ batch.add(scanResultValuePeekingIterator.next());
+ }
Review Comment:
Please add tests cases in `ScanQueryQueryToolChestTest` for this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]