abhishekagarwal87 commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1239700678


##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -613,96 +638,149 @@ private static <T, QueryType extends Query<T>> 
DataSource toInlineDataSource(
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,
       final AtomicLong memoryLimitAccumulator,
+      final AtomicBoolean cannotMaterializeToFrames,
       final int limit,
-      long memoryLimit
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
   )
   {
-    final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
-    boolean memoryLimitSet = memoryLimit >= 0;
-
-    if (limitAccumulator.get() >= limitToUse) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum[%d] reached",
-          limitToUse
-      );
-    }
-
-    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
-          memoryLimit
-      );
-    }
+    final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
 
     DataSource dataSource;
-    // Try to serialize the results into a frame only if the memory limit is 
set on the server or the query
-    if (memoryLimitSet) {
-      try {
-        Optional<Sequence<FrameSignaturePair>> framesOptional = 
toolChest.resultsAsFrames(
+
+    switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, 
cannotMaterializeToFrames.get())) {
+      case ROW_LIMIT:
+        if (limitAccumulator.get() >= rowLimitToUse) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] rows",
+              rowLimitToUse
+          );
+        }
+        dataSource = materializeResultsAsArray(
             query,
             results,
-            memoryLimit - memoryLimitAccumulator.get()
+            toolChest,
+            limitAccumulator,
+            limit
         );
-
-        if (!framesOptional.isPresent()) {
-          throw new ISE("The memory of the subqueries cannot be estimated 
correctly.");
+        break;
+      case MEMORY_LIMIT:
+        if (memoryLimitAccumulator.get() >= memoryLimit) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+              memoryLimit
+          );
         }
-
-        Sequence<FrameSignaturePair> frames = framesOptional.get();
-        List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
-        frames.forEach(
-            frame -> {
-              if 
(memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] bytes",
-                    memoryLimit
-                );
-
-              }
-              if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= 
limitToUse) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] rows",
-                    limitToUse
-                );
-              }
-              frameSignaturePairs.add(frame);
-            }
-        );
-        dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, 
toolChest.resultArraySignature(query));
-      }
-      catch (ResourceLimitExceededException rlee) {
-        throw rlee;
-      }
-      catch (Exception e) {
-        log.info(
-            "Unable to write the subquery results to a frame. Results won't be 
accounted for in the memory "
-            + "calculation"
+        Optional<DataSource> maybeDataSource = materializeResultsAsFrames(
+            query,
+            results,
+            toolChest,
+            limitAccumulator,
+            memoryLimitAccumulator,
+            memoryLimit,
+            useNestedForUnknownTypeInSubquery
         );
-        throw e;
-      }
-    } else {
-      final RowSignature signature = toolChest.resultArraySignature(query);
-
-      final ArrayList<Object[]> resultList = new ArrayList<>();
-
-      toolChest.resultsAsArrays(query, results).accumulate(
-          resultList,
-          (acc, in) -> {
-            if (limitAccumulator.getAndIncrement() >= limitToUse) {
-              throw ResourceLimitExceededException.withMessage(
-                  "Subquery generated results beyond maximum[%d] rows",
-                  limitToUse
-              );
-            }
-            acc.add(in);
-            return acc;
+        if (!maybeDataSource.isPresent()) {
+          cannotMaterializeToFrames.set(true);
+          // Check if the previous row limit accumulator has exceeded the 
memory results
+          if (memoryLimitAccumulator.get() >= memoryLimit) {
+            throw ResourceLimitExceededException.withMessage(
+                "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+                memoryLimit
+            );
           }
-      );
-      dataSource = InlineDataSource.fromIterable(resultList, signature);
+          dataSource = materializeResultsAsArray(
+              query,
+              results,
+              toolChest,
+              limitAccumulator,
+              limit
+          );
+        } else {
+          dataSource = maybeDataSource.get();
+        }
+        break;
+      default:
+        throw new IAE("Only row based and memory based limiting is supported");
     }
     return dataSource;
   }
 
+  private static <T, QueryType extends Query<T>> Optional<DataSource> 
materializeResultsAsFrames(
+      final QueryType query,
+      final Sequence<T> results,
+      final QueryToolChest<T, QueryType> toolChest,
+      final AtomicInteger limitAccumulator,
+      final AtomicLong memoryLimitAccumulator,
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
+  )
+  {
+    Optional<Sequence<FrameSignaturePair>> framesOptional;
+
+    try {
+      framesOptional = toolChest.resultsAsFrames(
+          query,
+          results,
+          new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+          useNestedForUnknownTypeInSubquery
+      );
+    }
+    catch (Exception e) {
+      return Optional.empty();

Review Comment:
   why is this a debug log though? It should be WARN. 



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -613,96 +638,149 @@ private static <T, QueryType extends Query<T>> 
DataSource toInlineDataSource(
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,
       final AtomicLong memoryLimitAccumulator,
+      final AtomicBoolean cannotMaterializeToFrames,
       final int limit,
-      long memoryLimit
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
   )
   {
-    final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
-    boolean memoryLimitSet = memoryLimit >= 0;
-
-    if (limitAccumulator.get() >= limitToUse) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum[%d] reached",
-          limitToUse
-      );
-    }
-
-    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
-      throw ResourceLimitExceededException.withMessage(
-          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
-          memoryLimit
-      );
-    }
+    final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
 
     DataSource dataSource;
-    // Try to serialize the results into a frame only if the memory limit is 
set on the server or the query
-    if (memoryLimitSet) {
-      try {
-        Optional<Sequence<FrameSignaturePair>> framesOptional = 
toolChest.resultsAsFrames(
+
+    switch (ClientQuerySegmentWalkerUtils.getLimitType(memoryLimit, 
cannotMaterializeToFrames.get())) {
+      case ROW_LIMIT:
+        if (limitAccumulator.get() >= rowLimitToUse) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] rows",
+              rowLimitToUse
+          );
+        }
+        dataSource = materializeResultsAsArray(
             query,
             results,
-            memoryLimit - memoryLimitAccumulator.get()
+            toolChest,
+            limitAccumulator,
+            limit
         );
-
-        if (!framesOptional.isPresent()) {
-          throw new ISE("The memory of the subqueries cannot be estimated 
correctly.");
+        break;
+      case MEMORY_LIMIT:
+        if (memoryLimitAccumulator.get() >= memoryLimit) {
+          throw ResourceLimitExceededException.withMessage(
+              "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+              memoryLimit
+          );
         }
-
-        Sequence<FrameSignaturePair> frames = framesOptional.get();
-        List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
-        frames.forEach(
-            frame -> {
-              if 
(memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] bytes",
-                    memoryLimit
-                );
-
-              }
-              if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= 
limitToUse) {
-                throw ResourceLimitExceededException.withMessage(
-                    "Subquery generated results beyond maximum[%d] rows",
-                    limitToUse
-                );
-              }
-              frameSignaturePairs.add(frame);
-            }
-        );
-        dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, 
toolChest.resultArraySignature(query));
-      }
-      catch (ResourceLimitExceededException rlee) {
-        throw rlee;
-      }
-      catch (Exception e) {
-        log.info(
-            "Unable to write the subquery results to a frame. Results won't be 
accounted for in the memory "
-            + "calculation"
+        Optional<DataSource> maybeDataSource = materializeResultsAsFrames(
+            query,
+            results,
+            toolChest,
+            limitAccumulator,
+            memoryLimitAccumulator,
+            memoryLimit,
+            useNestedForUnknownTypeInSubquery
         );
-        throw e;
-      }
-    } else {
-      final RowSignature signature = toolChest.resultArraySignature(query);
-
-      final ArrayList<Object[]> resultList = new ArrayList<>();
-
-      toolChest.resultsAsArrays(query, results).accumulate(
-          resultList,
-          (acc, in) -> {
-            if (limitAccumulator.getAndIncrement() >= limitToUse) {
-              throw ResourceLimitExceededException.withMessage(
-                  "Subquery generated results beyond maximum[%d] rows",
-                  limitToUse
-              );
-            }
-            acc.add(in);
-            return acc;
+        if (!maybeDataSource.isPresent()) {
+          cannotMaterializeToFrames.set(true);
+          // Check if the previous row limit accumulator has exceeded the 
memory results
+          if (memoryLimitAccumulator.get() >= memoryLimit) {
+            throw ResourceLimitExceededException.withMessage(
+                "Cannot issue the query, subqueries generated results beyond 
maximum[%d] bytes",
+                memoryLimit
+            );
           }
-      );
-      dataSource = InlineDataSource.fromIterable(resultList, signature);
+          dataSource = materializeResultsAsArray(
+              query,
+              results,
+              toolChest,
+              limitAccumulator,
+              limit
+          );
+        } else {
+          dataSource = maybeDataSource.get();
+        }
+        break;
+      default:
+        throw new IAE("Only row based and memory based limiting is supported");
     }
     return dataSource;
   }
 
+  private static <T, QueryType extends Query<T>> Optional<DataSource> 
materializeResultsAsFrames(
+      final QueryType query,
+      final Sequence<T> results,
+      final QueryToolChest<T, QueryType> toolChest,
+      final AtomicInteger limitAccumulator,
+      final AtomicLong memoryLimitAccumulator,
+      long memoryLimit,
+      boolean useNestedForUnknownTypeInSubquery
+  )
+  {
+    Optional<Sequence<FrameSignaturePair>> framesOptional;
+
+    try {
+      framesOptional = toolChest.resultsAsFrames(
+          query,
+          results,
+          new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+          useNestedForUnknownTypeInSubquery
+      );
+    }
+    catch (Exception e) {
+      return Optional.empty();
+    }
+
+    if (!framesOptional.isPresent()) {
+      return Optional.empty();
+    }
+
+    Sequence<FrameSignaturePair> frames = framesOptional.get();
+    List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
+    frames.forEach(
+        frame -> {
+          limitAccumulator.addAndGet(frame.getFrame().numRows());
+          if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= 
memoryLimit) {
+            throw ResourceLimitExceededException.withMessage(
+                "Subquery generated results beyond maximum[%d] bytes",

Review Comment:
   whats the expected action from user? 



##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -67,4 +76,58 @@ public static Filter buildFilter(@Nullable Filter filter, 
Interval interval)
       );
     }
   }
+
+  /**
+   * Writes a {@link Cursor} to a sequence of {@link Frame}. This method 
iterates over the rows of the cursor,
+   * and writes the columns to the frames
+   *
+   * @param cursor                 Cursor to write to the frame
+   * @param frameWriterFactory     Frame writer factory to write to the frame.
+   *                               Determines the signature of the rows that 
are written to the frames
+   */
+  public static Sequence<Frame> cursorToFrames(
+      Cursor cursor,
+      FrameWriterFactory frameWriterFactory
+  )
+  {
+
+    return Sequences.simple(
+        () -> new Iterator<Frame>()
+        {
+          @Override
+          public boolean hasNext()
+          {
+            return !cursor.isDone();
+          }
+
+          @Override
+          public Frame next()
+          {
+            // Makes sure that cursor contains some elements prior. This 
ensures if no row is written, then the row size
+            // is larger than the MemoryAllocators returned by the provided 
factory
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
+            boolean firstRowWritten = false;
+            Frame frame;
+            try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+              while (!cursor.isDone()) {
+                if (!frameWriter.addSelection()) {
+                  break;
+                }
+                firstRowWritten = true;
+                cursor.advance();
+              }
+
+              if (!firstRowWritten) {
+                throw new ISE("Row size is greater than the frame size.");

Review Comment:
   will a user ever see this error message? Please use the `DruidException` 
class instead and add an error message thats more actionable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to