cryptoe commented on code in PR #15987:
URL: https://github.com/apache/druid/pull/15987#discussion_r1512655681
##########
processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java:
##########
@@ -53,7 +53,7 @@ public static HeapMemoryAllocator unlimited()
@Override
public Optional<ResourceHolder<WritableMemory>> allocate(final long size)
{
- if (bytesAllocated < capacity - size) {
+ if (size <= capacity - bytesAllocated) {
Review Comment:
hmm can we have some tests cases which check this ?
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriterUtils;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Returns a thread-unsafe iterable, that converts a sequence of {@link
ScanResultValue} to an iterable of {@link FrameSignaturePair}.
+ * ScanResultValues can have heterogenous row signatures, and the returned
sequence would have batched
+ * them into frames appropriately.
+ * <p>
+ * The batching process greedily merges the values from the scan result values
that have the same signature, while
+ * still maintaining the manageable frame sizes that is determined by the
memory allocator by splitting the rows
+ * whenever necessary.
+ * <p>
+ * It is necessary that we don't batch and store the ScanResultValues
somewhere (like a List) while we do this processing
+ * to prevent the heap from exhausting, without limit. It has to be done
online - as the scan result values get materialized,
+ * we produce frames. A few ScanResultValues might be stored however (if the
frame got cut off in the middle)
+ * <p>
+ * Assuming that we have a sequence of scan result values like:
+ * <p>
+ * ScanResultValue1 - RowSignatureA - 3 rows
+ * ScanResultValue2 - RowSignatureB - 2 rows
+ * ScanResultValue3 - RowSignatureA - 1 rows
+ * ScanResultValue4 - RowSignatureA - 4 rows
+ * ScanResultValue5 - RowSignatureB - 3 rows
+ * <p>
+ * Also, assume that each individual frame can hold two rows (in practice, it
is determined by the row size and
+ * the memory block allocated by the memory allocator factory)
+ * <p>
+ * The output would be a sequence like:
+ * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1
+ * Frame2 - RowSignatureA - row 3 from ScanResultValue1
+ * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2
+ * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from
ScanResultValue4
+ * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4
+ * Frame6 - RowSignatureA - row 4 from ScanResultValue4
+ * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5
+ * Frame8 - RowSignatureB - row 3 from ScanResultValue6
+ * <p>
+ * TODO(laksh): What if single scan result value, and empty
Review Comment:
Hey Found a todo here. In case of a singleScanResult null value, we should
return a nil frame no ?
##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -831,6 +826,31 @@ private static <T, QueryType extends Query<T>> DataSource
materializeResultsAsAr
return InlineDataSource.fromIterable(resultList, signature);
}
+ private static String byteLimitExceededMessage(final long memoryLimit)
+ {
+ return org.apache.druid.java.util.common.StringUtils.format(
+ "Cannot issue the query, subqueries generated results beyond maximum[%d]
bytes. Increase the "
+ + "JVM's memory or set the '%s' in the query context carefully to increase
the space allocated for subqueries to "
Review Comment:
Same comment for clarifying carefully here.
##########
processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java:
##########
@@ -136,7 +136,7 @@ public boolean reserveAdditional(final int bytes)
// Allocation needed.
// Math.max(allocationSize, bytes) in case "bytes" is greater than
SOFT_MAXIMUM_ALLOCATION_SIZE.
final Optional<ResourceHolder<WritableMemory>> newMemory =
- allocator.allocate(Math.max(nextAllocationSize, bytes));
+ allocator.allocate(Math.min(allocator.available(),
Math.max(nextAllocationSize, bytes)));
Review Comment:
* Could you please explain this statement.
If you look at line 129, we are 100% sure that
(bytes<=alllocator.avalable()) so why would be allocate a new chunk which is
exactly same as allocate.available().
We should never go more than the bytes required rite?
NextAllocationSize is more of minCheck.
##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -670,8 +671,14 @@ private static <T, QueryType extends Query<T>> DataSource
toInlineDataSource(
if (limitAccumulator.get() >= rowLimitToUse) {
subqueryStatsProvider.incrementQueriesExceedingRowLimit();
throw ResourceLimitExceededException.withMessage(
- "Cannot issue the query, subqueries generated results beyond
maximum[%d] rows",
- rowLimitToUse
+ "Cannot issue the query, subqueries generated results beyond
maximum[%d] rows. Try setting the"
+ + "%s in the query context to '%s' for enabling byte based
limit, which chooses an optimal limit based on "
+ + "memory size and result's heap usage or manually configure the
values of either %s or %s in the query context",
Review Comment:
We should also mention that this limit should be carefully set since it can
be afftect other queries on the broker and can potentially lead to OOM.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriterUtils;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Returns a thread-unsafe iterable, that converts a sequence of {@link
ScanResultValue} to an iterable of {@link FrameSignaturePair}.
+ * ScanResultValues can have heterogenous row signatures, and the returned
sequence would have batched
+ * them into frames appropriately.
+ * <p>
+ * The batching process greedily merges the values from the scan result values
that have the same signature, while
+ * still maintaining the manageable frame sizes that is determined by the
memory allocator by splitting the rows
+ * whenever necessary.
+ * <p>
+ * It is necessary that we don't batch and store the ScanResultValues
somewhere (like a List) while we do this processing
+ * to prevent the heap from exhausting, without limit. It has to be done
online - as the scan result values get materialized,
+ * we produce frames. A few ScanResultValues might be stored however (if the
frame got cut off in the middle)
+ * <p>
+ * Assuming that we have a sequence of scan result values like:
+ * <p>
+ * ScanResultValue1 - RowSignatureA - 3 rows
+ * ScanResultValue2 - RowSignatureB - 2 rows
+ * ScanResultValue3 - RowSignatureA - 1 rows
+ * ScanResultValue4 - RowSignatureA - 4 rows
+ * ScanResultValue5 - RowSignatureB - 3 rows
+ * <p>
+ * Also, assume that each individual frame can hold two rows (in practice, it
is determined by the row size and
+ * the memory block allocated by the memory allocator factory)
+ * <p>
+ * The output would be a sequence like:
+ * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1
+ * Frame2 - RowSignatureA - row 3 from ScanResultValue1
+ * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2
+ * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from
ScanResultValue4
+ * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4
+ * Frame6 - RowSignatureA - row 4 from ScanResultValue4
+ * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5
+ * Frame8 - RowSignatureB - row 3 from ScanResultValue6
+ * <p>
+ * TODO(laksh): What if single scan result value, and empty
+ */
+
+public class ScanResultValueFramesIterable implements
Iterable<FrameSignaturePair>
+{
+
+ final Sequence<ScanResultValue> resultSequence;
+ final MemoryAllocatorFactory memoryAllocatorFactory;
+ final boolean useNestedForUnknownTypes;
+ final RowSignature defaultRowSignature;
+ final Function<RowSignature, Function<?, Object[]>> resultFormatMapper;
+
+ public ScanResultValueFramesIterable(
+ Sequence<ScanResultValue> resultSequence,
+ MemoryAllocatorFactory memoryAllocatorFactory,
+ boolean useNestedForUnknownTypes,
+ RowSignature defaultRowSignature,
+ Function<RowSignature, Function<?, Object[]>> resultFormatMapper
+ )
+ {
+ this.resultSequence = resultSequence;
+ this.memoryAllocatorFactory = memoryAllocatorFactory;
+ this.useNestedForUnknownTypes = useNestedForUnknownTypes;
+ this.defaultRowSignature = defaultRowSignature;
+ this.resultFormatMapper = resultFormatMapper;
+ }
+
+ @Override
+ public Iterator<FrameSignaturePair> iterator()
+ {
+ return new ScanResultValueFramesIterator(
+ resultSequence,
+ memoryAllocatorFactory,
+ useNestedForUnknownTypes,
+ defaultRowSignature,
+ resultFormatMapper
+ );
+ }
+
+ private static class ScanResultValueFramesIterator implements
Iterator<FrameSignaturePair>
+ {
+
+ /**
+ * Memory allocator factory to use for frame writers
+ */
+ final MemoryAllocatorFactory memoryAllocatorFactory;
+
+ /**
+ * Replace unknown types in the row signature with {@code COMPLEX<JSON>}
+ */
+ final boolean useNestedForUnknownTypes;
+
+ /**
+ * Default row signature to use if the scan result value doesn't cantain
any row signature. This will usually be
+ * the row signature of the scan query containing only the column names
(and no types)
+ */
+ final RowSignature defaultRowSignature;
+
+ /**
+ * Mapper to convert the scan result value to rows
+ */
+ final Function<RowSignature, Function<?, Object[]>> resultFormatMapper;
+
+ /**
+ * Accumulating the closers for all the resources created so far
+ */
+ final Closer closer = Closer.create();
+
+ /**
+ * Iterator from the scan result value's sequence, so that we can fetch
the individual values. Closer registers the
+ * iterator so that we can clean up any resources held by the sequence's
iterator, and prevent leaking
+ */
+ final ScanResultValueIterator resultSequenceIterator;
+
+ /**
+ * Either null, or points to the current non-empty cursor (and the cursor
would point to the current row)
+ */
+ Cursor currentCursor = null;
+
+ /**
+ * Row signature of the current row
+ */
+ RowSignature currentRowSignature = null;
+
+
+ public ScanResultValueFramesIterator(
+ Sequence<ScanResultValue> resultSequence,
+ MemoryAllocatorFactory memoryAllocatorFactory,
+ boolean useNestedForUnknownTypes,
+ RowSignature defaultRowSignature,
+ Function<RowSignature, Function<?, Object[]>> resultFormatMapper
+ )
+ {
+ this.memoryAllocatorFactory = memoryAllocatorFactory;
+ this.useNestedForUnknownTypes = useNestedForUnknownTypes;
+ this.defaultRowSignature = defaultRowSignature;
+ this.resultFormatMapper = resultFormatMapper;
+ this.resultSequenceIterator = new
ScanResultValueIterator(resultSequence);
+
+ closer.register(resultSequenceIterator);
+
+ // Makes sure that we run through all the empty scan result values at
the beginning and are pointing to a valid
+ // row
+ populateCursor();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !done();
+ }
+
+ @Override
+ public FrameSignaturePair next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more frames to produce. Call
`hasNext()` before calling `next()`");
+ }
+
+ // It would ensure that the cursor and the currentRowSignature is
populated properly before we
+ // start all the processing
+ populateCursor();
+ boolean firstRowWritten = false;
+ // While calling populateCursor() repeatedly, currentRowSignature might
change. Therefore we store the signature
+ // with which we have written the frames
+ final RowSignature writtenSignature = currentRowSignature;
+ FrameWriterFactory frameWriterFactory =
FrameWriters.makeFrameWriterFactory(
+ FrameType.COLUMNAR,
+ memoryAllocatorFactory,
+ currentRowSignature,
+ Collections.emptyList()
+ );
+ Frame frame;
+ try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
+ () -> currentCursor,
+ currentRowSignature
+ ))) {
+ while (populateCursor()) { // Do till we don't have any more rows, or
the next row isn't compatible with the current row
Review Comment:
This should also mention that we need to create a new frame if the current
frame is out of space no ?
##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -79,60 +79,71 @@ public static Filter buildFilter(@Nullable Filter filter,
Interval interval)
/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method
iterates over the rows of the cursor,
- * and writes the columns to the frames
- *
- * @param cursor Cursor to write to the frame
- * @param frameWriterFactory Frame writer factory to write to the frame.
- * Determines the signature of the rows that
are written to the frames
+ * and writes the columns to the frames. The iterable is lazy, and it
traverses the required portion of the cursor
+ * as required
*/
- public static Sequence<Frame> cursorToFrames(
- Cursor cursor,
- FrameWriterFactory frameWriterFactory
+ public static Iterable<Frame> cursorToFramesIterable(
+ final Cursor cursor,
+ final FrameWriterFactory frameWriterFactory
)
{
+ return () -> new Iterator<Frame>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
- return Sequences.simple(
- () -> new Iterator<Frame>()
- {
- @Override
- public boolean hasNext()
- {
- return !cursor.isDone();
- }
-
- @Override
- public Frame next()
- {
- // Makes sure that cursor contains some elements prior. This
ensures if no row is written, then the row size
- // is larger than the MemoryAllocators returned by the provided
factory
- if (!hasNext()) {
- throw new NoSuchElementException();
+ @Override
+ public Frame next()
+ {
+ // Makes sure that cursor contains some elements prior. This ensures
if no row is written, then the row size
+ // is larger than the MemoryAllocators returned by the provided factory
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ boolean firstRowWritten = false;
+ Frame frame;
+ try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+ while (!cursor.isDone()) {
+ if (!frameWriter.addSelection()) {
+ break;
}
- boolean firstRowWritten = false;
- Frame frame;
- try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
- while (!cursor.isDone()) {
- if (!frameWriter.addSelection()) {
- break;
- }
- firstRowWritten = true;
- cursor.advance();
- }
-
- if (!firstRowWritten) {
- throw DruidException
- .forPersona(DruidException.Persona.DEVELOPER)
- .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
- .build("Subquery's row size exceeds the frame size and
therefore cannot write the subquery's "
- + "row to the frame. This is a non-configurable
static limit that can only be modified by the "
- + "developer.");
- }
+ firstRowWritten = true;
+ cursor.advance();
+ }
- frame = Frame.wrap(frameWriter.toByteArray());
- }
- return frame;
+ if (!firstRowWritten) {
+ throw DruidException
+ .forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build("Subquery's row size exceeds the frame size and
therefore cannot write the subquery's "
Review Comment:
We should mention here that a single row size exceeds the frameSize limit of
`xxx`. We should also mention the corrective action.
If the user is hitting into this limit, they should change the input data
with either smaller rows or change the query in such a way that does not read
particularly fat columns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]