This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 11bb40981e0 Deduce type from the aggregators when materializing 
subquery results  (#16703)
11bb40981e0 is described below

commit 11bb40981e020914569e55349455d2e7403eb2b5
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Jul 23 11:52:39 2024 +0530

    Deduce type from the aggregators when materializing subquery results  
(#16703)
    
    For aggregators like StringFirst/Last, whose intermediate type isn't the 
same as the final type, using them in GroupBy, TopN or Timeseries subqueries 
causes a fallback when maxSubqueryBytes is set. This is because we assume that 
the finalization is not known, due to which the row signature cannot determine 
whether to use the intermediate or the final type, and it puts it as null. This 
PR figures out the finalization from the query context and uses the 
intermediate or the final type ap [...]
---
 .../query/groupby/GroupByQueryQueryToolChest.java  |   6 +-
 .../druid/query/timeseries/TimeseriesQuery.java    |  16 ++
 .../timeseries/TimeseriesQueryQueryToolChest.java  |  15 +-
 .../org/apache/druid/query/topn/TopNQuery.java     |  11 ++
 .../druid/query/topn/TopNQueryQueryToolChest.java  |  13 +-
 .../druid/sql/calcite/CalciteSubqueryTest.java     | 216 +++++++++++++++++++++
 6 files changed, 258 insertions(+), 19 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index d69e09c9ff0..3f2faf3fc56 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -828,7 +828,11 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
       boolean useNestedForUnknownTypes
   )
   {
-    RowSignature rowSignature = resultArraySignature(query);
+    RowSignature rowSignature = query.getResultRowSignature(
+        query.context().isFinalize(true)
+        ? RowSignature.Finalization.YES
+        : RowSignature.Finalization.NO
+    );
     RowSignature modifiedRowSignature = useNestedForUnknownTypes
                                         ? 
FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
                                         : rowSignature;
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
index 52fddccdf7b..d255a4e4cf4 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DataSource;
@@ -39,6 +40,8 @@ import org.apache.druid.query.filter.DimFilter;
 import 
org.apache.druid.query.groupby.orderby.DefaultLimitSpec.LimitJsonIncludeFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -167,6 +170,19 @@ public class TimeseriesQuery extends 
BaseQuery<Result<TimeseriesResultValue>>
     return context().getBoolean(SKIP_EMPTY_BUCKETS, false);
   }
 
+  public RowSignature getResultSignature(final RowSignature.Finalization 
finalization)
+  {
+    final RowSignature.Builder builder = RowSignature.builder();
+    builder.addTimeColumn();
+    String timestampResultField = getTimestampResultField();
+    if (StringUtils.isNotEmpty(timestampResultField)) {
+      builder.add(timestampResultField, ColumnType.LONG);
+    }
+    builder.addAggregators(aggregatorSpecs, finalization);
+    builder.addPostAggregators(postAggregatorSpecs);
+    return builder.build();
+  }
+
   @Nullable
   @Override
   public Set<String> getRequiredColumns()
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 67c36fe7603..3ad58270f8b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -62,7 +62,6 @@ import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.RowAdapters;
 import org.apache.druid.segment.RowBasedColumnSelectorFactory;
-import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.joda.time.DateTime;
 
@@ -439,14 +438,7 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
   @Override
   public RowSignature resultArraySignature(TimeseriesQuery query)
   {
-    RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
-    rowSignatureBuilder.addTimeColumn();
-    if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
-      rowSignatureBuilder.add(query.getTimestampResultField(), 
ColumnType.LONG);
-    }
-    rowSignatureBuilder.addAggregators(query.getAggregatorSpecs(), 
RowSignature.Finalization.UNKNOWN);
-    rowSignatureBuilder.addPostAggregators(query.getPostAggregatorSpecs());
-    return rowSignatureBuilder.build();
+    return query.getResultSignature(RowSignature.Finalization.UNKNOWN);
   }
 
   @Override
@@ -486,7 +478,10 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
       boolean useNestedForUnknownTypes
   )
   {
-    final RowSignature rowSignature = resultArraySignature(query);
+    final RowSignature rowSignature =
+        query.getResultSignature(
+            query.context().isFinalize(true) ? RowSignature.Finalization.YES : 
RowSignature.Finalization.NO
+        );
     final Pair<Cursor, Closeable> cursorAndCloseable = 
IterableRowsCursorHelper.getCursorFromSequence(
         resultsAsArrays(query, resultSequence),
         rowSignature
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
index 349e5a02d16..21729022d61 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -185,6 +186,16 @@ public class TopNQuery extends 
BaseQuery<Result<TopNResultValue>>
     topNMetricSpec.initTopNAlgorithmSelector(selector);
   }
 
+  public RowSignature getResultSignature(final RowSignature.Finalization 
finalization)
+  {
+    return RowSignature.builder()
+                       .addTimeColumn()
+                       
.addDimensions(Collections.singletonList(getDimensionSpec()))
+                       .addAggregators(getAggregatorSpecs(), finalization)
+                       .addPostAggregators(getPostAggregatorSpecs())
+                       .build();
+  }
+
   @Override
   public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 21bc336438a..c5f195615f3 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -68,7 +68,6 @@ import org.joda.time.DateTime;
 import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -518,12 +517,7 @@ public class TopNQueryQueryToolChest extends 
QueryToolChest<Result<TopNResultVal
   @Override
   public RowSignature resultArraySignature(TopNQuery query)
   {
-    return RowSignature.builder()
-                       .addTimeColumn()
-                       
.addDimensions(Collections.singletonList(query.getDimensionSpec()))
-                       .addAggregators(query.getAggregatorSpecs(), 
RowSignature.Finalization.UNKNOWN)
-                       .addPostAggregators(query.getPostAggregatorSpecs())
-                       .build();
+    return query.getResultSignature(RowSignature.Finalization.UNKNOWN);
   }
 
   @Override
@@ -569,7 +563,10 @@ public class TopNQueryQueryToolChest extends 
QueryToolChest<Result<TopNResultVal
       boolean useNestedForUnknownTypes
   )
   {
-    final RowSignature rowSignature = resultArraySignature(query);
+    final RowSignature rowSignature = query.getResultSignature(
+        query.context().isFinalize(true) ? RowSignature.Finalization.YES : 
RowSignature.Finalization.NO
+    );
+
     final Pair<Cursor, Closeable> cursorAndCloseable = 
IterableRowsCursorHelper.getCursorFromSequence(
         resultsAsArrays(query, resultSequence),
         rowSignature
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
index 61223084220..c1236162fa0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java
@@ -51,10 +51,12 @@ import 
org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.SingleValueAggregatorFactory;
+import 
org.apache.druid.query.aggregation.firstlast.first.StringFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@@ -1385,6 +1387,220 @@ public class CalciteSubqueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @MethodSource("constructorFeeder")
+  @ParameterizedTest(name = "{0}")
+  public void testGroupBySubqueryWithEarliestAggregator(String testName, 
Map<String, Object> queryContext)
+  {
+    cannotVectorize();
+
+    // Note: EARLIEST aggregator is used because the intermediate type 
"serializablePair" is different from the finalized type
+    final List<Object[]> expectedResults;
+    if (NullHandling.replaceWithDefault()) {
+      expectedResults = ImmutableList.of(
+          new Object[]{"1", "", "a", "1"},
+          new Object[]{"10.1", "b", "", "10.1"},
+          new Object[]{"10.1", "c", "", "10.1"},
+          new Object[]{"2", "d", "", "2"},
+          new Object[]{"abc", "", "", "abc"},
+          new Object[]{"def", "", "abc", "def"}
+      );
+    } else {
+      expectedResults = ImmutableList.of(
+          new Object[]{"", "a", "a", ""},
+          new Object[]{"", "b", "a", ""},
+          new Object[]{"1", "", "a", "1"},
+          new Object[]{"10.1", "b", null, "10.1"},
+          new Object[]{"10.1", "c", null, "10.1"},
+          new Object[]{"2", "d", "", "2"},
+          new Object[]{"abc", null, null, "abc"},
+          new Object[]{"def", null, "abc", "def"}
+      );
+    }
+
+    testQuery(
+        "SELECT a.dim1, a.dim3, a.e_dim2, b.dim1 "
+        + "FROM ("
+        + " SELECT dim1, dim3, EARLIEST(dim2) AS e_dim2 "
+        + " FROM foo GROUP BY 1, 2 LIMIT 100"
+        + ") a "
+        + "INNER JOIN foo b ON a.dim1 = b.dim1",
+        queryContext,
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    JoinDataSource.create(
+                        new QueryDataSource(
+                            GroupByQuery.builder()
+                                        .setDataSource("foo")
+                                        
.setInterval(querySegmentSpec(Intervals.ETERNITY))
+                                        .setGranularity(Granularities.ALL)
+                                        .setDimensions(
+                                            new DefaultDimensionSpec("dim1", 
"d0", ColumnType.STRING),
+                                            new DefaultDimensionSpec("dim3", 
"d1", ColumnType.STRING)
+                                        )
+                                        .addAggregator(new 
StringFirstAggregatorFactory("a0", "dim2", "__time", 1024))
+                                        .setLimitSpec(new 
DefaultLimitSpec(Collections.emptyList(), 100))
+                                        .build()
+                        ),
+                        new QueryDataSource(
+                            newScanQueryBuilder()
+                                .dataSource("foo")
+                                
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                .columns("dim1")
+                                .build()
+                        ),
+                        "j0.",
+                        "(\"d0\" == \"j0.dim1\")",
+                        JoinType.INNER,
+                        null,
+                        TestExprMacroTable.INSTANCE,
+                        null
+                    )
+                )
+                .intervals(querySegmentSpec(Intervals.ETERNITY))
+                .columns("a0", "d0", "d1", "j0.dim1")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        expectedResults
+    );
+  }
+
+  @MethodSource("constructorFeeder")
+  @ParameterizedTest(name = "{0}")
+  public void testTopNSubqueryWithEarliestAggregator(String testName, 
Map<String, Object> queryContext)
+  {
+    final List<Object[]> expectedResults;
+    if (NullHandling.replaceWithDefault()) {
+      expectedResults = ImmutableList.of(
+          new Object[]{"1", "a", "1"},
+          new Object[]{"10.1", "", "10.1"},
+          new Object[]{"2", "", "2"},
+          new Object[]{"abc", "", "abc"},
+          new Object[]{"def", "abc", "def"}
+      );
+    } else {
+      expectedResults = ImmutableList.of(
+          new Object[]{"", "a", ""},
+          new Object[]{"1", "a", "1"},
+          new Object[]{"10.1", null, "10.1"},
+          new Object[]{"2", "", "2"},
+          new Object[]{"abc", null, "abc"},
+          new Object[]{"def", "abc", "def"}
+      );
+    }
+
+    testQuery(
+        "SELECT a.dim1, a.e_dim2, b.dim1 "
+        + "FROM ("
+        + " SELECT dim1, EARLIEST(dim2) AS e_dim2 "
+        + " FROM foo "
+        + " GROUP BY 1 "
+        + " LIMIT 100"
+        + ") a "
+        + "INNER JOIN foo b ON a.dim1 = b.dim1",
+        queryContext,
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    JoinDataSource.create(
+                        new QueryDataSource(
+                            new TopNQueryBuilder()
+                                .dataSource("foo")
+                                .dimension(new DefaultDimensionSpec("dim1", 
"d0", ColumnType.STRING))
+                                .metric(new DimensionTopNMetricSpec(null, 
StringComparators.LEXICOGRAPHIC))
+                                .threshold(100)
+                                
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                .granularity(Granularities.ALL)
+                                .aggregators(
+                                    new StringFirstAggregatorFactory("a0", 
"dim2", "__time", 1024)
+                                )
+                                .build()
+                        ),
+                        new QueryDataSource(
+                            newScanQueryBuilder()
+                                .dataSource("foo")
+                                
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                .columns("dim1")
+                                .build()
+                        ),
+                        "j0.",
+                        "(\"d0\" == \"j0.dim1\")",
+                        JoinType.INNER,
+                        null,
+                        TestExprMacroTable.INSTANCE,
+                        null
+                    )
+                )
+                .intervals(querySegmentSpec(Intervals.ETERNITY))
+                .columns("a0", "d0", "j0.dim1")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        expectedResults
+    );
+  }
+
+  @MethodSource("constructorFeeder")
+  @ParameterizedTest(name = "{0}")
+  public void testTimeseriesSubqueryWithEarliestAggregator(String testName, 
Map<String, Object> queryContext)
+  {
+    testQuery(
+        "SELECT a.__time, a.e_dim2, b.__time "
+        + "FROM ("
+        + " SELECT TIME_FLOOR(\"__time\", 'PT24H') as __time, EARLIEST(dim2) 
AS e_dim2 "
+        + " FROM foo "
+        + " GROUP BY 1 "
+        + ") a "
+        + "INNER JOIN foo b ON a.__time = b.__time",
+        queryContext,
+        ImmutableList.of(
+            newScanQueryBuilder()
+                .dataSource(
+                    JoinDataSource.create(
+                        new QueryDataSource(
+                            Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("foo")
+                                  
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                  .granularity(new PeriodGranularity(
+                                      new Period("PT24H"),
+                                      null,
+                                      DateTimeZone.UTC
+                                  ))
+                                  .aggregators(new 
StringFirstAggregatorFactory("a0", "dim2", "__time", 1024))
+                                  .build()
+                        ),
+                        new QueryDataSource(
+                            newScanQueryBuilder()
+                                .dataSource("foo")
+                                
.intervals(querySegmentSpec(Intervals.ETERNITY))
+                                .columns("__time")
+                                .build()
+                        ),
+                        "j0.",
+                        "(\"d0\" == \"j0.__time\")",
+                        JoinType.INNER,
+                        null,
+                        TestExprMacroTable.INSTANCE,
+                        null
+                    )
+                )
+                .intervals(querySegmentSpec(Intervals.ETERNITY))
+                .columns("a0", "d0", "j0.__time")
+                .context(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{946684800000L, "a", 946684800000L},
+            new Object[]{946771200000L, NullHandling.defaultStringValue(), 
946771200000L},
+            new Object[]{946857600000L, "", 946857600000L},
+            new Object[]{978307200000L, "a", 978307200000L},
+            new Object[]{978393600000L, "abc", 978393600000L},
+            new Object[]{978480000000L, NullHandling.defaultStringValue(), 
978480000000L}
+        )
+    );
+  }
+
   public static class SubqueryComponentSupplier extends 
SqlTestFramework.StandardComponentSupplier
   {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to