cryptoe commented on code in PR #15987:
URL: https://github.com/apache/druid/pull/15987#discussion_r1512655681


##########
processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java:
##########
@@ -53,7 +53,7 @@ public static HeapMemoryAllocator unlimited()
   @Override
   public Optional<ResourceHolder<WritableMemory>> allocate(final long size)
   {
-    if (bytesAllocated < capacity - size) {
+    if (size <= capacity - bytesAllocated) {

Review Comment:
   hmm can we have some tests cases which check this ?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriterUtils;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Returns a thread-unsafe iterable, that converts a sequence of {@link 
ScanResultValue} to an iterable of {@link FrameSignaturePair}.
+ * ScanResultValues can have heterogenous row signatures, and the returned 
sequence would have batched
+ * them into frames appropriately.
+ * <p>
+ * The batching process greedily merges the values from the scan result values 
that have the same signature, while
+ * still maintaining the manageable frame sizes that is determined by the 
memory allocator by splitting the rows
+ * whenever necessary.
+ * <p>
+ * It is necessary that we don't batch and store the ScanResultValues 
somewhere (like a List) while we do this processing
+ * to prevent the heap from exhausting, without limit. It has to be done 
online - as the scan result values get materialized,
+ * we produce frames. A few ScanResultValues might be stored however (if the 
frame got cut off in the middle)
+ * <p>
+ * Assuming that we have a sequence of scan result values like:
+ * <p>
+ * ScanResultValue1 - RowSignatureA - 3 rows
+ * ScanResultValue2 - RowSignatureB - 2 rows
+ * ScanResultValue3 - RowSignatureA - 1 rows
+ * ScanResultValue4 - RowSignatureA - 4 rows
+ * ScanResultValue5 - RowSignatureB - 3 rows
+ * <p>
+ * Also, assume that each individual frame can hold two rows (in practice, it 
is determined by the row size and
+ * the memory block allocated by the memory allocator factory)
+ * <p>
+ * The output would be a sequence like:
+ * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1
+ * Frame2 - RowSignatureA - row 3 from ScanResultValue1
+ * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2
+ * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from 
ScanResultValue4
+ * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4
+ * Frame6 - RowSignatureA - row 4 from ScanResultValue4
+ * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5
+ * Frame8 - RowSignatureB - row 3 from ScanResultValue6
+ * <p>
+ * TODO(laksh): What if single scan result value, and empty

Review Comment:
   Hey Found a todo here. In case of a singleScanResult null value, we should 
return a nil frame no ?



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -831,6 +826,31 @@ private static <T, QueryType extends Query<T>> DataSource 
materializeResultsAsAr
     return InlineDataSource.fromIterable(resultList, signature);
   }
 
+  private static String byteLimitExceededMessage(final long memoryLimit)
+  {
+    return org.apache.druid.java.util.common.StringUtils.format(
+    "Cannot issue the query, subqueries generated results beyond maximum[%d] 
bytes. Increase the "
+    + "JVM's memory or set the '%s' in the query context carefully to increase 
the space allocated for subqueries to "

Review Comment:
   Same comment for clarifying carefully here. 



##########
processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java:
##########
@@ -136,7 +136,7 @@ public boolean reserveAdditional(final int bytes)
       // Allocation needed.
       // Math.max(allocationSize, bytes) in case "bytes" is greater than 
SOFT_MAXIMUM_ALLOCATION_SIZE.
       final Optional<ResourceHolder<WritableMemory>> newMemory =
-          allocator.allocate(Math.max(nextAllocationSize, bytes));
+          allocator.allocate(Math.min(allocator.available(), 
Math.max(nextAllocationSize, bytes)));

Review Comment:
   
   * Could you please explain this statement. 
   If you look at line 129, we are 100% sure that 
(bytes<=alllocator.avalable()) so why would be allocate a new chunk which is 
exactly same as allocate.available(). 
   We should never go more than the bytes required rite?
   NextAllocationSize is more of minCheck. 
   
   
   



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -670,8 +671,14 @@ private static <T, QueryType extends Query<T>> DataSource 
toInlineDataSource(
         if (limitAccumulator.get() >= rowLimitToUse) {
           subqueryStatsProvider.incrementQueriesExceedingRowLimit();
           throw ResourceLimitExceededException.withMessage(
-              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] rows",
-              rowLimitToUse
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] rows. Try setting the"
+              + "%s in the query context to '%s' for enabling byte based 
limit, which chooses an optimal limit based on "
+              + "memory size and result's heap usage or manually configure the 
values of either %s or %s in the query context",

Review Comment:
   We should also mention that this limit should be carefully set since it can 
be afftect other queries on the broker and can potentially lead to OOM. 



##########
processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java:
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriterUtils;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Returns a thread-unsafe iterable, that converts a sequence of {@link 
ScanResultValue} to an iterable of {@link FrameSignaturePair}.
+ * ScanResultValues can have heterogenous row signatures, and the returned 
sequence would have batched
+ * them into frames appropriately.
+ * <p>
+ * The batching process greedily merges the values from the scan result values 
that have the same signature, while
+ * still maintaining the manageable frame sizes that is determined by the 
memory allocator by splitting the rows
+ * whenever necessary.
+ * <p>
+ * It is necessary that we don't batch and store the ScanResultValues 
somewhere (like a List) while we do this processing
+ * to prevent the heap from exhausting, without limit. It has to be done 
online - as the scan result values get materialized,
+ * we produce frames. A few ScanResultValues might be stored however (if the 
frame got cut off in the middle)
+ * <p>
+ * Assuming that we have a sequence of scan result values like:
+ * <p>
+ * ScanResultValue1 - RowSignatureA - 3 rows
+ * ScanResultValue2 - RowSignatureB - 2 rows
+ * ScanResultValue3 - RowSignatureA - 1 rows
+ * ScanResultValue4 - RowSignatureA - 4 rows
+ * ScanResultValue5 - RowSignatureB - 3 rows
+ * <p>
+ * Also, assume that each individual frame can hold two rows (in practice, it 
is determined by the row size and
+ * the memory block allocated by the memory allocator factory)
+ * <p>
+ * The output would be a sequence like:
+ * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1
+ * Frame2 - RowSignatureA - row 3 from ScanResultValue1
+ * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2
+ * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from 
ScanResultValue4
+ * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4
+ * Frame6 - RowSignatureA - row 4 from ScanResultValue4
+ * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5
+ * Frame8 - RowSignatureB - row 3 from ScanResultValue6
+ * <p>
+ * TODO(laksh): What if single scan result value, and empty
+ */
+
+public class ScanResultValueFramesIterable implements 
Iterable<FrameSignaturePair>
+{
+
+  final Sequence<ScanResultValue> resultSequence;
+  final MemoryAllocatorFactory memoryAllocatorFactory;
+  final boolean useNestedForUnknownTypes;
+  final RowSignature defaultRowSignature;
+  final Function<RowSignature, Function<?, Object[]>> resultFormatMapper;
+
+  public ScanResultValueFramesIterable(
+      Sequence<ScanResultValue> resultSequence,
+      MemoryAllocatorFactory memoryAllocatorFactory,
+      boolean useNestedForUnknownTypes,
+      RowSignature defaultRowSignature,
+      Function<RowSignature, Function<?, Object[]>> resultFormatMapper
+  )
+  {
+    this.resultSequence = resultSequence;
+    this.memoryAllocatorFactory = memoryAllocatorFactory;
+    this.useNestedForUnknownTypes = useNestedForUnknownTypes;
+    this.defaultRowSignature = defaultRowSignature;
+    this.resultFormatMapper = resultFormatMapper;
+  }
+
+  @Override
+  public Iterator<FrameSignaturePair> iterator()
+  {
+    return new ScanResultValueFramesIterator(
+        resultSequence,
+        memoryAllocatorFactory,
+        useNestedForUnknownTypes,
+        defaultRowSignature,
+        resultFormatMapper
+    );
+  }
+
+  private static class ScanResultValueFramesIterator implements 
Iterator<FrameSignaturePair>
+  {
+
+    /**
+     * Memory allocator factory to use for frame writers
+     */
+    final MemoryAllocatorFactory memoryAllocatorFactory;
+
+    /**
+     * Replace unknown types in the row signature with {@code COMPLEX<JSON>}
+     */
+    final boolean useNestedForUnknownTypes;
+
+    /**
+     * Default row signature to use if the scan result value doesn't cantain 
any row signature. This will usually be
+     * the row signature of the scan query containing only the column names 
(and no types)
+     */
+    final RowSignature defaultRowSignature;
+
+    /**
+     * Mapper to convert the scan result value to rows
+     */
+    final Function<RowSignature, Function<?, Object[]>> resultFormatMapper;
+
+    /**
+     * Accumulating the closers for all the resources created so far
+     */
+    final Closer closer = Closer.create();
+
+    /**
+     * Iterator from the scan result value's sequence, so that we can fetch 
the individual values. Closer registers the
+     * iterator so that we can clean up any resources held by the sequence's 
iterator, and prevent leaking
+     */
+    final ScanResultValueIterator resultSequenceIterator;
+
+    /**
+     * Either null, or points to the current non-empty cursor (and the cursor 
would point to the current row)
+     */
+    Cursor currentCursor = null;
+
+    /**
+     * Row signature of the current row
+     */
+    RowSignature currentRowSignature = null;
+
+
+    public ScanResultValueFramesIterator(
+        Sequence<ScanResultValue> resultSequence,
+        MemoryAllocatorFactory memoryAllocatorFactory,
+        boolean useNestedForUnknownTypes,
+        RowSignature defaultRowSignature,
+        Function<RowSignature, Function<?, Object[]>> resultFormatMapper
+    )
+    {
+      this.memoryAllocatorFactory = memoryAllocatorFactory;
+      this.useNestedForUnknownTypes = useNestedForUnknownTypes;
+      this.defaultRowSignature = defaultRowSignature;
+      this.resultFormatMapper = resultFormatMapper;
+      this.resultSequenceIterator = new 
ScanResultValueIterator(resultSequence);
+
+      closer.register(resultSequenceIterator);
+
+      // Makes sure that we run through all the empty scan result values at 
the beginning and are pointing to a valid
+      // row
+      populateCursor();
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return !done();
+    }
+
+    @Override
+    public FrameSignaturePair next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more frames to produce. Call 
`hasNext()` before calling `next()`");
+      }
+
+      // It would ensure that the cursor and the currentRowSignature is 
populated properly before we
+      // start all the processing
+      populateCursor();
+      boolean firstRowWritten = false;
+      // While calling populateCursor() repeatedly, currentRowSignature might 
change. Therefore we store the signature
+      // with which we have written the frames
+      final RowSignature writtenSignature = currentRowSignature;
+      FrameWriterFactory frameWriterFactory = 
FrameWriters.makeFrameWriterFactory(
+          FrameType.COLUMNAR,
+          memoryAllocatorFactory,
+          currentRowSignature,
+          Collections.emptyList()
+      );
+      Frame frame;
+      try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory(
+          () -> currentCursor,
+          currentRowSignature
+      ))) {
+        while (populateCursor()) { // Do till we don't have any more rows, or 
the next row isn't compatible with the current row

Review Comment:
   This should also mention that we need to create a new frame if the current 
frame is out of space no ?



##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -79,60 +79,71 @@ public static Filter buildFilter(@Nullable Filter filter, 
Interval interval)
 
   /**
    * Writes a {@link Cursor} to a sequence of {@link Frame}. This method 
iterates over the rows of the cursor,
-   * and writes the columns to the frames
-   *
-   * @param cursor                 Cursor to write to the frame
-   * @param frameWriterFactory     Frame writer factory to write to the frame.
-   *                               Determines the signature of the rows that 
are written to the frames
+   * and writes the columns to the frames. The iterable is lazy, and it 
traverses the required portion of the cursor
+   * as required
    */
-  public static Sequence<Frame> cursorToFrames(
-      Cursor cursor,
-      FrameWriterFactory frameWriterFactory
+  public static Iterable<Frame> cursorToFramesIterable(
+      final Cursor cursor,
+      final FrameWriterFactory frameWriterFactory
   )
   {
+    return () -> new Iterator<Frame>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        return !cursor.isDone();
+      }
 
-    return Sequences.simple(
-        () -> new Iterator<Frame>()
-        {
-          @Override
-          public boolean hasNext()
-          {
-            return !cursor.isDone();
-          }
-
-          @Override
-          public Frame next()
-          {
-            // Makes sure that cursor contains some elements prior. This 
ensures if no row is written, then the row size
-            // is larger than the MemoryAllocators returned by the provided 
factory
-            if (!hasNext()) {
-              throw new NoSuchElementException();
+      @Override
+      public Frame next()
+      {
+        // Makes sure that cursor contains some elements prior. This ensures 
if no row is written, then the row size
+        // is larger than the MemoryAllocators returned by the provided factory
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        boolean firstRowWritten = false;
+        Frame frame;
+        try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+          while (!cursor.isDone()) {
+            if (!frameWriter.addSelection()) {
+              break;
             }
-            boolean firstRowWritten = false;
-            Frame frame;
-            try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
-              while (!cursor.isDone()) {
-                if (!frameWriter.addSelection()) {
-                  break;
-                }
-                firstRowWritten = true;
-                cursor.advance();
-              }
-
-              if (!firstRowWritten) {
-                throw DruidException
-                    .forPersona(DruidException.Persona.DEVELOPER)
-                    .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
-                    .build("Subquery's row size exceeds the frame size and 
therefore cannot write the subquery's "
-                           + "row to the frame. This is a non-configurable 
static limit that can only be modified by the "
-                           + "developer.");
-              }
+            firstRowWritten = true;
+            cursor.advance();
+          }
 
-              frame = Frame.wrap(frameWriter.toByteArray());
-            }
-            return frame;
+          if (!firstRowWritten) {
+            throw DruidException
+                .forPersona(DruidException.Persona.DEVELOPER)
+                .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+                .build("Subquery's row size exceeds the frame size and 
therefore cannot write the subquery's "

Review Comment:
   We should mention here that a single row size exceeds the frameSize limit of 
`xxx`. We should also mention the corrective action. 
   If the user is hitting into this limit, they should change the input data 
with either smaller rows or change the query in such a way that does not read 
particularly fat columns.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to