This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3a1b4370567 Improve the fallback strategy when the broker is unable to
materialize the subquery's results as frames for estimating the bytes (#16679)
3a1b4370567 is described below
commit 3a1b4370567f9655da3d24d427052d550092f8ac
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Jul 12 21:49:12 2024 +0530
Improve the fallback strategy when the broker is unable to materialize the
subquery's results as frames for estimating the bytes (#16679)
Better fallback strategy when the broker is unable to materialize the
subquery's results as frames for estimating the bytes:
a. We don't touch the subquery sequence till we know that we can
materialize the result as frames
---
.../sql/DoublesSketchSqlAggregatorTest.java | 143 +++++++++++++++++++++
.../druid/frame/segment/FrameCursorUtils.java | 23 +++-
.../query/groupby/GroupByQueryQueryToolChest.java | 2 +
.../timeseries/TimeseriesQueryQueryToolChest.java | 2 +
.../druid/query/topn/TopNQueryQueryToolChest.java | 2 +
.../druid/server/ClientQuerySegmentWalker.java | 28 ++--
6 files changed, 190 insertions(+), 10 deletions(-)
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index 9122e1ecc7e..a85d508a879 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -24,8 +24,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.JoinDataSource;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -33,6 +36,7 @@ import
org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
+import
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
@@ -43,6 +47,7 @@ import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchTo
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator;
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
import
org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier;
+import
org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@@ -53,6 +58,7 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -263,6 +269,143 @@ public class DoublesSketchSqlAggregatorTest extends
BaseCalciteQueryTest
);
}
+ @Test
+ public void testSubqueryWithNestedGroupBy()
+ {
+ final List<Object[]> expectedResults = ImmutableList.of(
+ new Object[]{946684800000L, "", 1L, "val1"},
+ new Object[]{946684800000L, "1", 1L, "val1"},
+ new Object[]{946684800000L, "10.1", 1L, "val1"},
+ new Object[]{946684800000L, "2", 1L, "val1"},
+ new Object[]{946684800000L, "abc", 1L, "val1"},
+ new Object[]{946684800000L, "def", 1L, "val1"}
+ );
+
+ testQuery(
+ "SELECT\n"
+ + " MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n"
+ + " alias.\"user\",\n"
+ + " alias.days,\n"
+ + " (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n"
+ + " WHEN alias.days >= quantiles.first_quartile AND alias.days <
quantiles.third_quartile THEN 'val3' \n"
+ + " WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS
val4\n"
+ + "FROM (\n"
+ + " SELECT\n"
+ + " APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n"
+ + " APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n"
+ + " FROM (\n"
+ + " SELECT\n"
+ + " dim1 \"user\",\n"
+ + " COUNT(DISTINCT __time) AS days\n"
+ + " FROM \"foo\"\n"
+ + " GROUP BY 1\n"
+ + " ) AS alias\n"
+ + ") AS quantiles, (\n"
+ + " SELECT\n"
+ + " dim1 \"user\",\n"
+ + " COUNT(DISTINCT __time) AS days\n"
+ + " FROM \"foo\"\n"
+ + " GROUP BY 1\n"
+ + ") AS alias\n",
+ ImmutableMap.<String, Object>builder()
+ .putAll(QUERY_CONTEXT_DEFAULT)
+ .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
+ // Disallows the fallback to row based limiting
+ .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10")
+ .build(),
+ ImmutableList.of(
+ newScanQueryBuilder()
+ .dataSource(
+ JoinDataSource.create(
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource(
+ new QueryDataSource(
+ GroupByQuery.builder()
+
.setDataSource("foo")
+
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+
.setGranularity(Granularities.ALL)
+ .addDimension(new
DefaultDimensionSpec(
+ "dim1",
+ "d0",
+
ColumnType.STRING
+ ))
+ .addAggregator(new
CardinalityAggregatorFactory(
+ "a0:a",
+ null,
+
Collections.singletonList(new DefaultDimensionSpec(
+ "__time",
+ "__time",
+
ColumnType.LONG
+ )),
+ false,
+ true
+ ))
+
.setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator(
+ "a0",
+ "a0:a"
+ ))
+ .build()
+ )
+ )
+
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+ .setGranularity(Granularities.ALL)
+ .addAggregator(new
DoublesSketchAggregatorFactory("_a0:agg", "a0", 128))
+ .setPostAggregatorSpecs(
+ new
DoublesSketchToQuantilePostAggregator(
+ "_a0",
+ new
FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
+ 0.25
+ ),
+ new
DoublesSketchToQuantilePostAggregator(
+ "_a1",
+ new
FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
+ 0.75
+ )
+ )
+ .build()
+
+ ),
+ new QueryDataSource(
+ GroupByQuery.builder()
+ .setDataSource("foo")
+
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
+ .setGranularity(Granularities.ALL)
+ .addDimension(new
DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
+ .addAggregator(new
CardinalityAggregatorFactory(
+ "a0",
+ null,
+ Collections.singletonList(new
DefaultDimensionSpec(
+ "__time",
+ "__time",
+ ColumnType.LONG
+ )),
+ false,
+ true
+ ))
+ .build()
+ ),
+ "j0.",
+ "1",
+ JoinType.INNER,
+ null,
+ TestExprMacroTable.INSTANCE,
+ null
+ )
+ )
+ .intervals(querySegmentSpec(Intervals.ETERNITY))
+ .virtualColumns(
+ new ExpressionVirtualColumn("v0", "946684800000",
ColumnType.LONG, TestExprMacroTable.INSTANCE),
+ new ExpressionVirtualColumn("v1",
"case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") &&
(\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ )
+ .columns("j0.a0", "j0.d0", "v0", "v1")
+ .build()
+ ),
+ expectedResults
+ );
+ }
+
+
@Test
public void testQuantileOnCastedString()
{
diff --git
a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
index 3cb5c686e9d..de970363bb4 100644
---
a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
+++
b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java
@@ -23,6 +23,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -32,6 +33,7 @@ import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.Interval;
@@ -100,13 +102,18 @@ public class FrameCursorUtils
/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method
iterates over the rows of the cursor,
* and writes the columns to the frames. The iterable is lazy, and it
traverses the required portion of the cursor
- * as required
+ * as required.
+ * <p>
+ * If the type is missing from the signature, the method throws an exception
without advancing/modifying/closing the
+ * cursor
*/
public static Iterable<Frame> cursorToFramesIterable(
final Cursor cursor,
final FrameWriterFactory frameWriterFactory
)
{
+ throwIfColumnsHaveUnknownType(frameWriterFactory.signature());
+
return () -> new Iterator<Frame>()
{
@Override
@@ -158,7 +165,19 @@ public class FrameCursorUtils
final FrameWriterFactory frameWriterFactory
)
{
-
return Sequences.simple(cursorToFramesIterable(cursor,
frameWriterFactory));
}
+
+ /**
+ * Throws {@link UnsupportedColumnTypeException} if the row signature has
columns with unknown types. This is used to
+ * pre-determine if the frames can be materialized as rows, without touching
the resource generating the frames.
+ */
+ public static void throwIfColumnsHaveUnknownType(final RowSignature
rowSignature)
+ {
+ for (int i = 0; i < rowSignature.size(); ++i) {
+ if (!rowSignature.getColumnType(i).isPresent()) {
+ throw new
UnsupportedColumnTypeException(rowSignature.getColumnName(i), null);
+ }
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index 7588848cf5b..b19b479c26d 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -816,6 +816,8 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
?
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
+ FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
FrameWriterFactory frameWriterFactory =
FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 8527d551cf5..17a2f8be956 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -485,6 +485,8 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
RowSignature modifiedRowSignature = useNestedForUnknownTypes
?
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
+ FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
FrameWriterFactory frameWriterFactory =
FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 02d07e25570..25a4284aa42 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -569,6 +569,8 @@ public class TopNQueryQueryToolChest extends
QueryToolChest<Result<TopNResultVal
RowSignature modifiedRowSignature = useNestedForUnknownTypes
?
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
+ FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
+
FrameWriterFactory frameWriterFactory =
FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
rowSignature,
diff --git
a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index 1d3b38b2fdb..d49ce3909f7 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -746,6 +746,7 @@ public class ClientQuerySegmentWalker implements
QuerySegmentWalker
{
Optional<Sequence<FrameSignaturePair>> framesOptional;
+ boolean startedAccumulating = false;
try {
framesOptional = toolChest.resultsAsFrames(
query,
@@ -760,6 +761,9 @@ public class ClientQuerySegmentWalker implements
QuerySegmentWalker
Sequence<FrameSignaturePair> frames = framesOptional.get();
List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
+
+ startedAccumulating = true;
+
frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
@@ -772,21 +776,29 @@ public class ClientQuerySegmentWalker implements
QuerySegmentWalker
}
);
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs,
toolChest.resultArraySignature(query)));
-
- }
- catch (ResourceLimitExceededException e) {
- throw e;
}
catch (UnsupportedColumnTypeException e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
log.debug(e, "Type info in signature insufficient to materialize rows as
frames.");
return Optional.empty();
}
+ catch (ResourceLimitExceededException e) {
+ throw e;
+ }
catch (Exception e) {
- subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason();
- log.debug(e, "Unable to materialize the results as frames due to an
unhandleable exception "
- + "while conversion. Defaulting to materializing the
results as rows");
- return Optional.empty();
+ if (startedAccumulating) {
+ // If we have opened the resultSequence, we can't fall back safely as
the resultSequence might hold some resources
+ // that we release on exception, and we need to throw the exception to
disable the 'maxSubqueryBytes' configuration
+ throw DruidException.defensive()
+ .build(
+ e,
+ "Unable to materialize the results as frames
for estimating the byte footprint. "
+ + "Please disable the 'maxSubqueryBytes' by
setting it to 'disabled' in the query context or removing it altogether "
+ + "from the query context and/or the server
config."
+ );
+ } else {
+ return Optional.empty();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]