gianm commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1182869188


##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +627,80 @@ private static <T, QueryType extends Query<T>> 
InlineDataSource toInlineDataSour
       );
     }
 
-    final RowSignature signature = toolChest.resultArraySignature(query);
+    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimit
+      );
+    }
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    DataSource dataSource;
+    // Try to serialize the results into a frame only if the memory limit is 
set on the server or the query
+    if (memoryLimitSet) {
+      try {
+        Optional<Sequence<FrameSignaturePair>> framesOptional = 
toolChest.resultsAsFrames(
+            query,
+            results,
+            memoryLimit - memoryLimitAccumulator.get()
+        );
 
-    toolChest.resultsAsArrays(query, results).accumulate(
-        resultList,
-        (acc, in) -> {
-          if (limitAccumulator.getAndIncrement() >= limitToUse) {
-            throw ResourceLimitExceededException.withMessage(
-                "Subquery generated results beyond maximum[%d]",
-                limitToUse
-            );
-          }
-          acc.add(in);
-          return acc;
+        if (!framesOptional.isPresent()) {
+          throw new ISE("The memory of the subqueries cannot be estimated 
correctly.");
         }
-    );
 
-    return InlineDataSource.fromIterable(resultList, signature);
+        Sequence<FrameSignaturePair> frames = framesOptional.get();
+        List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
+        frames.forEach(
+            frame -> {
+              if 
(memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
+                throw ResourceLimitExceededException.withMessage(
+                    "Subquery generated results beyond maximum[%d] bytes",
+                    memoryLimit
+                );
+
+              }
+              if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= 
limitToUse) {
+                throw ResourceLimitExceededException.withMessage(
+                    "Subquery generated results beyond maximum[%d] rows",
+                    limitToUse
+                );
+              }
+              frameSignaturePairs.add(frame);
+            }
+        );
+        dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, 
toolChest.resultArraySignature(query));
+      }
+      catch (ResourceLimitExceededException rlee) {
+        throw rlee;
+      }
+      catch (Exception e) {
+        log.info(

Review Comment:
   In this `catch` case, the whole query fails, right? Are there known 
situations where it would happen?
   
   IMO we should be doing one of two things:
   
   1. If we believe this can never happen, we don't need a `catch` block at 
all. Just let the exception out and fail the query. Let's then also make sure 
we have enough testing to be confident it doesn't happen.
   2. If there's known situations where this would happen, we should detect 
them and avoid going down the frame path at all. If that's not feasible, then 
we should put in some logic here to recover the query rather than failing it.
   
   What're your thoughts?
   
   Either way, there's no reason to log anything, let's not clutter the logs. 
The standard exception handling is enough if the query is going to fail. And if 
it's not going to fail, we should recover quietly.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -560,15 +607,18 @@ private DataSource insertSubqueryIds(
    *                         If zero, this method will throw an error 
immediately.
    * @throws ResourceLimitExceededException if the limit is exceeded
    */
-  private static <T, QueryType extends Query<T>> InlineDataSource 
toInlineDataSource(
+  private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
       final QueryType query,
       final Sequence<T> results,
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,
-      final int limit
+      final AtomicLong memoryLimitAccumulator,
+      final int limit,
+      long memoryLimit
   )
   {
     final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;

Review Comment:
   Rename this `rowLimitToUse`, for clarity.



##########
processing/src/main/java/org/apache/druid/frame/write/FrameWriters.java:
##########
@@ -53,24 +54,38 @@ private FrameWriters()
    * @param signature        signature of the frames
    * @param sortColumns      sort columns for the frames. If nonempty, {@link 
FrameSort#sort} is used to sort the
    *                         resulting frames.
+   * @param allowNullColumnTypes to allow null ColumnType in the signature. 
This should only be enabled when the user
+   *                             knows that the column objects exist as native 
Java POJOs (LinkedList, Maps etc), which
+   *                             can be serded using the Druid's nested columns
    */
   public static FrameWriterFactory makeFrameWriterFactory(
       final FrameType frameType,
       final MemoryAllocatorFactory allocatorFactory,
-      final RowSignature signature,
-      final List<KeyColumn> sortColumns
+      @Nonnull final RowSignature signature,

Review Comment:
   Use `@Nullable` where appropriate, but avoid `@Nonnull` on parameters in 
general, since it creates an implication that unannotated things _can_ be null. 
But, they mostly can't, so it's misleading.
   
   If you're looking for benefit for static analyzers, better to use 
`@EverythingIsNonnullByDefault` at package level through `package-info.java`. 
We have that on various packages already and could add it to more as desired. 
That's like annotating every un-annotated parameter with `@Nonnull`, without 
the clutter.



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -330,4 +331,32 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, 
Sequence<ResultType>
   {
     throw new UOE("Query type '%s' does not support returning results as 
arrays", query.getType());
   }
+
+  /**
+   * Converts a sequence of this query's ResultType into a sequence of {@link 
FrameSignaturePair}. The array signature
+   * is the one give by {@link #resultArraySignature(Query)}. If the toolchest 
doesn't support this method, then it can
+   * return an empty optional. It is the duty of the callees to throw an 
appropriate exception in that case or use an
+   * alternative fallback approach
+   *
+   * Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the 
behaviour of the rows represented by the
+   * frame sequence is identical.
+   *
+   * Each Frame has a separate {@link RowSignature} because for some query 
types like the Scan query, every

Review Comment:
   This seems to contradict the statement earlier that "the array signature is 
the one given by `resultArraySignature`". Sounds like it could actually be 
different. In that case please update the earlier text accordingly.



##########
server/src/main/java/org/apache/druid/segment/FrameBasedInlineSegmentWrangler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameSegment;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import java.util.stream.Collectors;
+
+public class FrameBasedInlineSegmentWrangler implements SegmentWrangler
+{
+
+  private static final String SEGMENT_ID = "inline";
+
+  @Override
+  public Iterable<Segment> getSegmentsForIntervals(
+      DataSource dataSource,
+      Iterable<Interval> intervals
+  )
+  {
+    final FramesBackedInlineDataSource framesBackedInlineDataSource = 
(FramesBackedInlineDataSource) dataSource;
+
+    return framesBackedInlineDataSource.getFrames().stream().map(

Review Comment:
   Better to avoid materializing the list: return an `Iterable` based on 
`.iterator()` instead of `.collect(Collectors.toList())`.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -705,6 +718,39 @@ public Sequence<Object[]> resultsAsArrays(final 
GroupByQuery query, final Sequen
     return resultSequence.map(ResultRow::getArray);
   }
 
+  /**
+   * This returns a single frame containing the results of the group by query.
+   */
+  @Override
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
+      GroupByQuery query,
+      Sequence<ResultRow> resultSequence,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    RowSignature rowSignature = resultArraySignature(query);
+
+    FrameWriterFactory frameWriterFactory = 
FrameWriters.makeFrameWriterFactory(
+        FrameType.ROW_BASED,
+        new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+        rowSignature,
+        new ArrayList<>(),
+        true
+    );
+
+
+    Cursor cursor = IterableRowsCursorHelper.getCursorFromSequence(
+        resultSequence.map(ResultRow::getArray),
+        rowSignature
+    );
+
+    Frame frame = FrameCursorUtils.cursorToFrame(cursor, frameWriterFactory, 
memoryLimitBytes);

Review Comment:
   With the changes to `resultsAsFrames`, this would start returning a sequence 
of multiple smaller frames, and wouldn't take `memoryLimitBytes`.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -179,15 +190,19 @@ public <T> QueryRunner<T> 
getQueryRunnerForIntervals(Query<T> query, Iterable<In
     }
 
     // Now that we know the structure is workable, actually do the inlining 
(if necessary).
-    newQuery = newQuery.withDataSource(
-        inlineIfNecessary(
-            freeTradeDataSource,
-            toolChest,
-            new AtomicInteger(),
-            maxSubqueryRows,
-            false
-        )
+    AtomicLong memoryLimitAcc = new AtomicLong(0);
+    DataSource maybeInlinedDataSource = inlineIfNecessary(
+        freeTradeDataSource,
+        toolChest,
+        new AtomicInteger(),
+        memoryLimitAcc,
+        maxSubqueryRows,
+        maxSubqueryMemory,
+        false
     );
+    newQuery = newQuery.withDataSource(maybeInlinedDataSource);
+
+    log.info("Memory used by subqueries of query [%s] is [%d]", query, 
memoryLimitAcc.get());

Review Comment:
   We shouldn't have any `INFO` level logs that happen on a per-query basis. It 
generates too many log messages. Please reduce this to `DEBUG` level.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -313,7 +328,9 @@ private DataSource inlineIfNecessary(
       final DataSource dataSource,
       @Nullable final QueryToolChest toolChestIfOutermost,
       final AtomicInteger subqueryRowLimitAccumulator,
+      final AtomicLong subqueryRowMemoryLimitAccumulator,

Review Comment:
   `subqueryMemoryLimitAccumulator` is a better name.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +627,80 @@ private static <T, QueryType extends Query<T>> 
InlineDataSource toInlineDataSour
       );
     }
 
-    final RowSignature signature = toolChest.resultArraySignature(query);
+    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
+      throw ResourceLimitExceededException.withMessage(

Review Comment:
   Update the other message to refer to the row limit, for clarity.



##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -348,7 +350,7 @@ public void testTimeseriesOnGroupByOnTable()
 
     testQuery(
         query,
-        ImmutableList.of(
+        new ArrayList<>(ImmutableList.of(

Review Comment:
   Is there a reason this needs to be a mutable array?



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -330,4 +331,32 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, 
Sequence<ResultType>
   {
     throw new UOE("Query type '%s' does not support returning results as 
arrays", query.getType());
   }
+
+  /**
+   * Converts a sequence of this query's ResultType into a sequence of {@link 
FrameSignaturePair}. The array signature
+   * is the one give by {@link #resultArraySignature(Query)}. If the toolchest 
doesn't support this method, then it can
+   * return an empty optional. It is the duty of the callees to throw an 
appropriate exception in that case or use an
+   * alternative fallback approach
+   *
+   * Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the 
behaviour of the rows represented by the
+   * frame sequence is identical.
+   *
+   * Each Frame has a separate {@link RowSignature} because for some query 
types like the Scan query, every
+   * column in the final result might not be present in the individual 
ResultType (and subsequently Frame). Therefore,
+   * this is done to preserve the space by not populating the column in that 
particular Frame and omitting it from its
+   * signature
+   * 
+   * @param query Query being executed by the toolchest. Used to determine the 
rowSignature of the Frames
+   * @param resultSequence results of the form returned by {@link 
#mergeResults(QueryRunner)}
+   * @param memoryLimitBytes Limit the memory results. Throws {@link 
ResourceLimitExceededException} if the result exceed
+   *                    the memoryLimitBytes
+   */
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(

Review Comment:
   Rather than accepting `memoryLimitBytes` and returning a jumbo frame, it'd 
be better to return a sequence of moderately-sized frames and have the caller 
decide when they've had too many frames. This makes the method simpler, easier 
to test/understand, and more flexible for potential future use cases. In 
particular I suggest the following changes:
   
   - Remove the `memoryLimitBytes` parameter.
   - Add a `MemoryAllocatorFactory` parameter so the caller can control the 
size of each frame.



##########
server/src/main/java/org/apache/druid/segment/FrameBasedInlineSegmentWrangler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameSegment;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import java.util.stream.Collectors;
+
+public class FrameBasedInlineSegmentWrangler implements SegmentWrangler

Review Comment:
   Some of the classes are `FrameBased` and some are `FramesBacked`. Let's keep 
it consistent and use `FrameBased` for all of them.



##########
processing/src/main/java/org/apache/druid/query/QueryContexts.java:
##########
@@ -54,6 +54,7 @@
   public static final String VECTORIZE_VIRTUAL_COLUMNS_KEY = 
"vectorizeVirtualColumns";
   public static final String VECTOR_SIZE_KEY = "vectorSize";
   public static final String MAX_SUBQUERY_ROWS_KEY = "maxSubqueryRows";
+  public static final String MAX_SUBQUERY_MEMORY_BYTES_KEY = 
"maxSubqueryMemoryBytes";

Review Comment:
   `maxSubqueryBytes` is better, IMO, since it's just as clear and a bit more 
succinct.



##########
processing/src/main/java/org/apache/druid/frame/write/FrameWriters.java:
##########
@@ -53,24 +54,38 @@ private FrameWriters()
    * @param signature        signature of the frames
    * @param sortColumns      sort columns for the frames. If nonempty, {@link 
FrameSort#sort} is used to sort the
    *                         resulting frames.
+   * @param allowNullColumnTypes to allow null ColumnType in the signature. 
This should only be enabled when the user
+   *                             knows that the column objects exist as native 
Java POJOs (LinkedList, Maps etc), which
+   *                             can be serded using the Druid's nested columns
    */
   public static FrameWriterFactory makeFrameWriterFactory(

Review Comment:
   IMO adding `allowNullColumnTypes` as a parameter here isn't the right 
approach. Better for the caller to replace all the unknown types (`null`) in 
`signature` with `COMPLEX<json>`, if that's what's desired. It keeps the frame 
writer code simpler, easier to test/understand, etc.



##########
processing/src/main/java/org/apache/druid/segment/join/FramesBackedInlineJoinableFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join;
+
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.query.InlineDataSource;
+
+import java.util.Optional;
+
+/**
+ * Creates a joinable from the {@link FramesBackedInlineDataSource}. This 
materializes the datasource to an
+ * {@link InlineDataSource}, before creating the joinable on it, which carries 
the overhead of this conversion.
+ */
+public class FramesBackedInlineJoinableFactory implements JoinableFactory
+{
+  private final InlineJoinableFactory INLINE_JOINABLE_FACTORY = new 
InlineJoinableFactory();
+
+
+  @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    return dataSource instanceof FramesBackedInlineDataSource;
+  }
+
+  @Override
+  public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis 
condition)
+  {
+    FramesBackedInlineDataSource framesBackedInlineDataSource = 
(FramesBackedInlineDataSource) dataSource;
+    InlineDataSource inlineDataSource = 
framesBackedInlineDataSource.toInlineDataSource();

Review Comment:
   Can we create a Joinable directly on the frames? Materialization to inline 
is going to be expensive and likely 2–3x memory usage as well. I don't think 
that is going to be viable.



##########
processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java:
##########
@@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()
 
     JodaStuff.register(this);
 
+    addSerializer(FramesBackedInlineDataSource.class, new 
FramesBackedInlineDataSourceSerializer());

Review Comment:
   I would think we don't need to serialize this, as it should exist in-memory 
only. So I'm also wondering where this was needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to