cryptoe commented on code in PR #15987: URL: https://github.com/apache/druid/pull/15987#discussion_r1512655681
########## processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java: ########## @@ -53,7 +53,7 @@ public static HeapMemoryAllocator unlimited() @Override public Optional<ResourceHolder<WritableMemory>> allocate(final long size) { - if (bytesAllocated < capacity - size) { + if (size <= capacity - bytesAllocated) { Review Comment: hmm can we have some tests cases which check this ? ########## processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java: ########## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.IterableRowsCursorHelper; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.RowSignature; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Returns a thread-unsafe iterable, that converts a sequence of {@link ScanResultValue} to an iterable of {@link FrameSignaturePair}. + * ScanResultValues can have heterogenous row signatures, and the returned sequence would have batched + * them into frames appropriately. + * <p> + * The batching process greedily merges the values from the scan result values that have the same signature, while + * still maintaining the manageable frame sizes that is determined by the memory allocator by splitting the rows + * whenever necessary. + * <p> + * It is necessary that we don't batch and store the ScanResultValues somewhere (like a List) while we do this processing + * to prevent the heap from exhausting, without limit. It has to be done online - as the scan result values get materialized, + * we produce frames. A few ScanResultValues might be stored however (if the frame got cut off in the middle) + * <p> + * Assuming that we have a sequence of scan result values like: + * <p> + * ScanResultValue1 - RowSignatureA - 3 rows + * ScanResultValue2 - RowSignatureB - 2 rows + * ScanResultValue3 - RowSignatureA - 1 rows + * ScanResultValue4 - RowSignatureA - 4 rows + * ScanResultValue5 - RowSignatureB - 3 rows + * <p> + * Also, assume that each individual frame can hold two rows (in practice, it is determined by the row size and + * the memory block allocated by the memory allocator factory) + * <p> + * The output would be a sequence like: + * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1 + * Frame2 - RowSignatureA - row 3 from ScanResultValue1 + * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2 + * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from ScanResultValue4 + * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4 + * Frame6 - RowSignatureA - row 4 from ScanResultValue4 + * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5 + * Frame8 - RowSignatureB - row 3 from ScanResultValue6 + * <p> + * TODO(laksh): What if single scan result value, and empty Review Comment: Hey Found a todo here. In case of a singleScanResult null value, we should return a nil frame no ? ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -831,6 +826,31 @@ private static <T, QueryType extends Query<T>> DataSource materializeResultsAsAr return InlineDataSource.fromIterable(resultList, signature); } + private static String byteLimitExceededMessage(final long memoryLimit) + { + return org.apache.druid.java.util.common.StringUtils.format( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes. Increase the " + + "JVM's memory or set the '%s' in the query context carefully to increase the space allocated for subqueries to " Review Comment: Same comment for clarifying carefully here. ########## processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java: ########## @@ -136,7 +136,7 @@ public boolean reserveAdditional(final int bytes) // Allocation needed. // Math.max(allocationSize, bytes) in case "bytes" is greater than SOFT_MAXIMUM_ALLOCATION_SIZE. final Optional<ResourceHolder<WritableMemory>> newMemory = - allocator.allocate(Math.max(nextAllocationSize, bytes)); + allocator.allocate(Math.min(allocator.available(), Math.max(nextAllocationSize, bytes))); Review Comment: * Could you please explain this statement. If you look at line 129, we are 100% sure that (bytes<=alllocator.avalable()) so why would be allocate a new chunk which is exactly same as allocate.available(). We should never go more than the bytes required rite? NextAllocationSize is more of minCheck. ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -670,8 +671,14 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource( if (limitAccumulator.get() >= rowLimitToUse) { subqueryStatsProvider.incrementQueriesExceedingRowLimit(); throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", - rowLimitToUse + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows. Try setting the" + + "%s in the query context to '%s' for enabling byte based limit, which chooses an optimal limit based on " + + "memory size and result's heap usage or manually configure the values of either %s or %s in the query context", Review Comment: We should also mention that this limit should be carefully set since it can be afftect other queries on the broker and can potentially lead to OOM. ########## processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java: ########## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.IterableRowsCursorHelper; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.RowSignature; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Returns a thread-unsafe iterable, that converts a sequence of {@link ScanResultValue} to an iterable of {@link FrameSignaturePair}. + * ScanResultValues can have heterogenous row signatures, and the returned sequence would have batched + * them into frames appropriately. + * <p> + * The batching process greedily merges the values from the scan result values that have the same signature, while + * still maintaining the manageable frame sizes that is determined by the memory allocator by splitting the rows + * whenever necessary. + * <p> + * It is necessary that we don't batch and store the ScanResultValues somewhere (like a List) while we do this processing + * to prevent the heap from exhausting, without limit. It has to be done online - as the scan result values get materialized, + * we produce frames. A few ScanResultValues might be stored however (if the frame got cut off in the middle) + * <p> + * Assuming that we have a sequence of scan result values like: + * <p> + * ScanResultValue1 - RowSignatureA - 3 rows + * ScanResultValue2 - RowSignatureB - 2 rows + * ScanResultValue3 - RowSignatureA - 1 rows + * ScanResultValue4 - RowSignatureA - 4 rows + * ScanResultValue5 - RowSignatureB - 3 rows + * <p> + * Also, assume that each individual frame can hold two rows (in practice, it is determined by the row size and + * the memory block allocated by the memory allocator factory) + * <p> + * The output would be a sequence like: + * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1 + * Frame2 - RowSignatureA - row 3 from ScanResultValue1 + * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2 + * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from ScanResultValue4 + * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4 + * Frame6 - RowSignatureA - row 4 from ScanResultValue4 + * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5 + * Frame8 - RowSignatureB - row 3 from ScanResultValue6 + * <p> + * TODO(laksh): What if single scan result value, and empty + */ + +public class ScanResultValueFramesIterable implements Iterable<FrameSignaturePair> +{ + + final Sequence<ScanResultValue> resultSequence; + final MemoryAllocatorFactory memoryAllocatorFactory; + final boolean useNestedForUnknownTypes; + final RowSignature defaultRowSignature; + final Function<RowSignature, Function<?, Object[]>> resultFormatMapper; + + public ScanResultValueFramesIterable( + Sequence<ScanResultValue> resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes, + RowSignature defaultRowSignature, + Function<RowSignature, Function<?, Object[]>> resultFormatMapper + ) + { + this.resultSequence = resultSequence; + this.memoryAllocatorFactory = memoryAllocatorFactory; + this.useNestedForUnknownTypes = useNestedForUnknownTypes; + this.defaultRowSignature = defaultRowSignature; + this.resultFormatMapper = resultFormatMapper; + } + + @Override + public Iterator<FrameSignaturePair> iterator() + { + return new ScanResultValueFramesIterator( + resultSequence, + memoryAllocatorFactory, + useNestedForUnknownTypes, + defaultRowSignature, + resultFormatMapper + ); + } + + private static class ScanResultValueFramesIterator implements Iterator<FrameSignaturePair> + { + + /** + * Memory allocator factory to use for frame writers + */ + final MemoryAllocatorFactory memoryAllocatorFactory; + + /** + * Replace unknown types in the row signature with {@code COMPLEX<JSON>} + */ + final boolean useNestedForUnknownTypes; + + /** + * Default row signature to use if the scan result value doesn't cantain any row signature. This will usually be + * the row signature of the scan query containing only the column names (and no types) + */ + final RowSignature defaultRowSignature; + + /** + * Mapper to convert the scan result value to rows + */ + final Function<RowSignature, Function<?, Object[]>> resultFormatMapper; + + /** + * Accumulating the closers for all the resources created so far + */ + final Closer closer = Closer.create(); + + /** + * Iterator from the scan result value's sequence, so that we can fetch the individual values. Closer registers the + * iterator so that we can clean up any resources held by the sequence's iterator, and prevent leaking + */ + final ScanResultValueIterator resultSequenceIterator; + + /** + * Either null, or points to the current non-empty cursor (and the cursor would point to the current row) + */ + Cursor currentCursor = null; + + /** + * Row signature of the current row + */ + RowSignature currentRowSignature = null; + + + public ScanResultValueFramesIterator( + Sequence<ScanResultValue> resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes, + RowSignature defaultRowSignature, + Function<RowSignature, Function<?, Object[]>> resultFormatMapper + ) + { + this.memoryAllocatorFactory = memoryAllocatorFactory; + this.useNestedForUnknownTypes = useNestedForUnknownTypes; + this.defaultRowSignature = defaultRowSignature; + this.resultFormatMapper = resultFormatMapper; + this.resultSequenceIterator = new ScanResultValueIterator(resultSequence); + + closer.register(resultSequenceIterator); + + // Makes sure that we run through all the empty scan result values at the beginning and are pointing to a valid + // row + populateCursor(); + } + + @Override + public boolean hasNext() + { + return !done(); + } + + @Override + public FrameSignaturePair next() + { + if (!hasNext()) { + throw new NoSuchElementException("No more frames to produce. Call `hasNext()` before calling `next()`"); + } + + // It would ensure that the cursor and the currentRowSignature is populated properly before we + // start all the processing + populateCursor(); + boolean firstRowWritten = false; + // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature + // with which we have written the frames + final RowSignature writtenSignature = currentRowSignature; + FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( + FrameType.COLUMNAR, + memoryAllocatorFactory, + currentRowSignature, + Collections.emptyList() + ); + Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory( + () -> currentCursor, + currentRowSignature + ))) { + while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row Review Comment: This should also mention that we need to create a new frame if the current frame is out of space no ? ########## processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java: ########## @@ -79,60 +79,71 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval) /** * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor, - * and writes the columns to the frames - * - * @param cursor Cursor to write to the frame - * @param frameWriterFactory Frame writer factory to write to the frame. - * Determines the signature of the rows that are written to the frames + * and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor + * as required */ - public static Sequence<Frame> cursorToFrames( - Cursor cursor, - FrameWriterFactory frameWriterFactory + public static Iterable<Frame> cursorToFramesIterable( + final Cursor cursor, + final FrameWriterFactory frameWriterFactory ) { + return () -> new Iterator<Frame>() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } - return Sequences.simple( - () -> new Iterator<Frame>() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public Frame next() - { - // Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size - // is larger than the MemoryAllocators returned by the provided factory - if (!hasNext()) { - throw new NoSuchElementException(); + @Override + public Frame next() + { + // Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size + // is larger than the MemoryAllocators returned by the provided factory + if (!hasNext()) { + throw new NoSuchElementException(); + } + boolean firstRowWritten = false; + Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) { + while (!cursor.isDone()) { + if (!frameWriter.addSelection()) { + break; } - boolean firstRowWritten = false; - Frame frame; - try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) { - while (!cursor.isDone()) { - if (!frameWriter.addSelection()) { - break; - } - firstRowWritten = true; - cursor.advance(); - } - - if (!firstRowWritten) { - throw DruidException - .forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) - .build("Subquery's row size exceeds the frame size and therefore cannot write the subquery's " - + "row to the frame. This is a non-configurable static limit that can only be modified by the " - + "developer."); - } + firstRowWritten = true; + cursor.advance(); + } - frame = Frame.wrap(frameWriter.toByteArray()); - } - return frame; + if (!firstRowWritten) { + throw DruidException + .forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) + .build("Subquery's row size exceeds the frame size and therefore cannot write the subquery's " Review Comment: We should mention here that a single row size exceeds the frameSize limit of `xxx`. We should also mention the corrective action. If the user is hitting into this limit, they should change the input data with either smaller rows or change the query in such a way that does not read particularly fat columns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org