This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a1b4370567 Improve the fallback strategy when the broker is unable to 
materialize the subquery's results as frames for estimating the bytes (#16679)
3a1b4370567 is described below

commit 3a1b4370567f9655da3d24d427052d550092f8ac
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Jul 12 21:49:12 2024 +0530

    Improve the fallback strategy when the broker is unable to materialize the 
subquery's results as frames for estimating the bytes (#16679)
    
    Better fallback strategy when the broker is unable to materialize the 
subquery's results as frames for estimating the bytes:
    a. We don't touch the subquery sequence till we know that we can 
materialize the result as frames
---
 .../sql/DoublesSketchSqlAggregatorTest.java        | 143 +++++++++++++++++++++
 .../druid/frame/segment/FrameCursorUtils.java      |  23 +++-
 .../query/groupby/GroupByQueryQueryToolChest.java  |   2 +
 .../timeseries/TimeseriesQueryQueryToolChest.java  |   2 +
 .../druid/query/topn/TopNQueryQueryToolChest.java  |   2 +
 .../druid/server/ClientQuerySegmentWalker.java     |  28 ++--
 6 files changed, 190 insertions(+), 10 deletions(-)

diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index 9122e1ecc7e..a85d508a879 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -24,8 +24,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Injector;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -33,6 +36,7 @@ import 
org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
+import 
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
 import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
@@ -43,6 +47,7 @@ import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchTo
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier;
+import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
@@ -53,6 +58,7 @@ import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -263,6 +269,143 @@ public class DoublesSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testSubqueryWithNestedGroupBy()
+  {
+    final List<Object[]> expectedResults = ImmutableList.of(
+        new Object[]{946684800000L, "", 1L, "val1"},
+        new Object[]{946684800000L, "1", 1L, "val1"},
+        new Object[]{946684800000L, "10.1", 1L, "val1"},
+        new Object[]{946684800000L, "2", 1L, "val1"},
+        new Object[]{946684800000L, "abc", 1L, "val1"},
+        new Object[]{946684800000L, "def", 1L, "val1"}
+    );
+
+    testQuery(
+        "SELECT\n"
+        + "  MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n"
+        + "  alias.\"user\",\n"
+        + "  alias.days,\n"
+        + "  (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n"
+        + "  WHEN alias.days >= quantiles.first_quartile AND alias.days < 
quantiles.third_quartile THEN 'val3' \n"
+        + "  WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS 
val4\n"
+        + "FROM (\n"
+        + "  SELECT\n"
+        + "    APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n"
+        + "    APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n"
+        + "  FROM (\n"
+        + "    SELECT\n"
+        + "      dim1 \"user\",\n"
+        + "      COUNT(DISTINCT __time) AS days\n"
+        + "    FROM \"foo\"\n"
+        + "    GROUP BY 1\n"
+        + "  ) AS alias\n"
+        + ") AS quantiles, (\n"
+        + "  SELECT\n"
+        + "    dim1 \"user\",\n"
+        + "    COUNT(DISTINCT __time) AS days\n"
+        + "  FROM \"foo\"\n"
+        + "  GROUP BY 1\n"
+        + ") AS alias\n",
+        ImmutableMap.<String, Object>builder()
+                    .putAll(QUERY_CONTEXT_DEFAULT)
+                    .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
+                    // Disallows the fallback to row based limiting
+                    .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10")
+                    .build(),
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    JoinDataSource.create(
+                        new QueryDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource(
+                                            new QueryDataSource(
+                                                GroupByQuery.builder()
+                                                            
.setDataSource("foo")
+                                                            
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+                                                            
.setGranularity(Granularities.ALL)
+                                                            .addDimension(new 
DefaultDimensionSpec(
+                                                                "dim1",
+                                                                "d0",
+                                                                
ColumnType.STRING
+                                                            ))
+                                                            .addAggregator(new 
CardinalityAggregatorFactory(
+                                                                "a0:a",
+                                                                null,
+                                                                
Collections.singletonList(new DefaultDimensionSpec(
+                                                                    "__time",
+                                                                    "__time",
+                                                                    
ColumnType.LONG
+                                                                )),
+                                                                false,
+                                                                true
+                                                            ))
+                                                            
.setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator(
+                                                                "a0",
+                                                                "a0:a"
+                                                            ))
+                                                            .build()
+                                            )
+                                        )
+                                        
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+                                        .setGranularity(Granularities.ALL)
+                                        .addAggregator(new 
DoublesSketchAggregatorFactory("_a0:agg", "a0", 128))
+                                        .setPostAggregatorSpecs(
+                                            new 
DoublesSketchToQuantilePostAggregator(
+                                                "_a0",
+                                                new 
FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
+                                                0.25
+                                            ),
+                                            new 
DoublesSketchToQuantilePostAggregator(
+                                                "_a1",
+                                                new 
FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
+                                                0.75
+                                            )
+                                        )
+                                        .build()
+
+                        ),
+                        new QueryDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource("foo")
+                                        
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+                                        .setGranularity(Granularities.ALL)
+                                        .addDimension(new 
DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
+                                        .addAggregator(new 
CardinalityAggregatorFactory(
+                                            "a0",
+                                            null,
+                                            Collections.singletonList(new 
DefaultDimensionSpec(
+                                                "__time",
+                                                "__time",
+                                                ColumnType.LONG
+                                            )),
+                                            false,
+                                            true
+                                        ))
+                                        .build()
+                        ),
+                        "j0.",
+                        "1",
+                        JoinType.INNER,
+                        null,
+                        TestExprMacroTable.INSTANCE,
+                        null
+                    )
+                )
+                .intervals(querySegmentSpec(Intervals.ETERNITY))
+                .virtualColumns(
+                    new ExpressionVirtualColumn("v0", "946684800000", 
ColumnType.LONG, TestExprMacroTable.INSTANCE),
+                    new ExpressionVirtualColumn("v1", 
"case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") && 
(\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+                )
+                .columns("j0.a0", "j0.d0", "v0", "v1")
+                .build()
+        ),
+        expectedResults
+    );
+  }
+
+
   @Test
   public void testQuantileOnCastedString()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java 
b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
index 3cb5c686e9d..de970363bb4 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
@@ -23,6 +23,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.write.FrameWriter;
 import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.UnsupportedColumnTypeException;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -32,6 +33,7 @@ import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.filter.BoundFilter;
 import org.apache.druid.segment.filter.Filters;
 import org.joda.time.Interval;
@@ -100,13 +102,18 @@ public class FrameCursorUtils
   /**
    * Writes a {@link Cursor} to a sequence of {@link Frame}. This method 
iterates over the rows of the cursor,
    * and writes the columns to the frames. The iterable is lazy, and it 
traverses the required portion of the cursor
-   * as required
+   * as required.
+   * <p>
+   * If the type is missing from the signature, the method throws an exception 
without advancing/modifying/closing the
+   * cursor
    */
   public static Iterable<Frame> cursorToFramesIterable(
       final Cursor cursor,
       final FrameWriterFactory frameWriterFactory
   )
   {
+    throwIfColumnsHaveUnknownType(frameWriterFactory.signature());
+
     return () -> new Iterator<Frame>()
     {
       @Override
@@ -158,7 +165,19 @@ public class FrameCursorUtils
       final FrameWriterFactory frameWriterFactory
   )
   {
-
     return Sequences.simple(cursorToFramesIterable(cursor, 
frameWriterFactory));
   }
+
+  /**
+   * Throws {@link UnsupportedColumnTypeException} if the row signature has 
columns with unknown types. This is used to
+   * pre-determine if the frames can be materialized as rows, without touching 
the resource generating the frames.
+   */
+  public static void throwIfColumnsHaveUnknownType(final RowSignature 
rowSignature)
+  {
+    for (int i = 0; i < rowSignature.size(); ++i) {
+      if (!rowSignature.getColumnType(i).isPresent()) {
+        throw new 
UnsupportedColumnTypeException(rowSignature.getColumnName(i), null);
+      }
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index 7588848cf5b..b19b479c26d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -816,6 +816,8 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
                                         ? 
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
                                         : rowSignature;
 
+    FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
     FrameWriterFactory frameWriterFactory = 
FrameWriters.makeColumnBasedFrameWriterFactory(
         memoryAllocatorFactory,
         modifiedRowSignature,
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 8527d551cf5..17a2f8be956 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -485,6 +485,8 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
     RowSignature modifiedRowSignature = useNestedForUnknownTypes
                                         ? 
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
                                         : rowSignature;
+    FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
     FrameWriterFactory frameWriterFactory = 
FrameWriters.makeColumnBasedFrameWriterFactory(
         memoryAllocatorFactory,
         modifiedRowSignature,
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 02d07e25570..25a4284aa42 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -569,6 +569,8 @@ public class TopNQueryQueryToolChest extends 
QueryToolChest<Result<TopNResultVal
     RowSignature modifiedRowSignature = useNestedForUnknownTypes
                                         ? 
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
                                         : rowSignature;
+    FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
     FrameWriterFactory frameWriterFactory = 
FrameWriters.makeColumnBasedFrameWriterFactory(
         memoryAllocatorFactory,
         rowSignature,
diff --git 
a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java 
b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index 1d3b38b2fdb..d49ce3909f7 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -746,6 +746,7 @@ public class ClientQuerySegmentWalker implements 
QuerySegmentWalker
   {
     Optional<Sequence<FrameSignaturePair>> framesOptional;
 
+    boolean startedAccumulating = false;
     try {
       framesOptional = toolChest.resultsAsFrames(
           query,
@@ -760,6 +761,9 @@ public class ClientQuerySegmentWalker implements 
QuerySegmentWalker
 
       Sequence<FrameSignaturePair> frames = framesOptional.get();
       List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
+
+      startedAccumulating = true;
+
       frames.forEach(
           frame -> {
             limitAccumulator.addAndGet(frame.getFrame().numRows());
@@ -772,21 +776,29 @@ public class ClientQuerySegmentWalker implements 
QuerySegmentWalker
           }
       );
       return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, 
toolChest.resultArraySignature(query)));
-
-    }
-    catch (ResourceLimitExceededException e) {
-      throw e;
     }
     catch (UnsupportedColumnTypeException e) {
       
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
       log.debug(e, "Type info in signature insufficient to materialize rows as 
frames.");
       return Optional.empty();
     }
+    catch (ResourceLimitExceededException e) {
+      throw e;
+    }
     catch (Exception e) {
-      subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason();
-      log.debug(e, "Unable to materialize the results as frames due to an 
unhandleable exception "
-                   + "while conversion. Defaulting to materializing the 
results as rows");
-      return Optional.empty();
+      if (startedAccumulating) {
+        // If we have opened the resultSequence, we can't fall back safely as 
the resultSequence might hold some resources
+        // that we release on exception, and we need to throw the exception to 
disable the 'maxSubqueryBytes' configuration
+        throw DruidException.defensive()
+                            .build(
+                                e,
+                                "Unable to materialize the results as frames 
for estimating the byte footprint. "
+                                + "Please disable the 'maxSubqueryBytes' by 
setting it to 'disabled' in the query context or removing it altogether "
+                                + "from the query context and/or the server 
config."
+                            );
+      } else {
+        return Optional.empty();
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to