This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4507a4f fix merging of groupBy subtotal spec results (#8109)
4507a4f is described below
commit 4507a4f8f18b265bab7be10270e0a9ee0c369483
Author: Himanshu <[email protected]>
AuthorDate: Tue Aug 6 07:06:28 2019 -0700
fix merging of groupBy subtotal spec results (#8109)
* fix merging of groupBy subtotal spec results
* add post agg to subtotals spec ut
* add comment
* remove unnecessary agg transformation
* fix build
* fix test
* ignore unknown columns in ordering spec
* change variable names based on comment for easy read
* formatting
* don't ignore unknown columns in DefaultLimitSpec to not change existing
behavior
* handle limit spec columns correctly
* uncomment inadvertantly commented lines
* GroupByStrategyV2 changes
* test changes wip
* more fixes to handle merge buffer closing and limit spec
* uncomment line commented accidentally
---
.../druid/java/util/common/collect/Utils.java | 11 +
.../apache/druid/query/groupby/GroupByQuery.java | 16 +-
.../query/groupby/orderby/DefaultLimitSpec.java | 10 +
.../druid/query/groupby/orderby/LimitSpec.java | 10 +
.../druid/query/groupby/orderby/NoopLimitSpec.java | 8 +
.../query/groupby/strategy/GroupByStrategyV2.java | 205 ++++--
.../query/groupby/GroupByQueryRunnerTest.java | 795 +++++++--------------
7 files changed, 462 insertions(+), 593 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
index bd5444a..de7a239 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/collect/Utils.java
@@ -70,4 +70,15 @@ public class Utils
}
return list;
}
+
+ public static <T> boolean isPrefix(List<T> small, List<T> big)
+ {
+ for (int i = 0; i < small.size(); i++) {
+ if (!small.get(i).equals(big.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 10f043c..69ab185 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -229,28 +229,20 @@ public class GroupByQuery extends BaseQuery<ResultRow>
List<DimensionSpec> dimensions
)
{
- // if subtotalsSpec exists then validate that all are subsets of
dimensions spec and are in same order.
- // For example if we had {D1, D2, D3} in dimensions spec then
- // {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while
- // {D2, D1} is not as it is not in same order.
- // {D4} is not as its not a subset.
- // This restriction as enforced because implementation does sort merge on
the results of top-level query
- // results and expects that ordering of events does not change when
dimension columns are removed from
- // results of top level query.
+ // if subtotalsSpec exists then validate that all are subsets of
dimensions spec.
if (subtotalsSpec != null) {
for (List<String> subtotalSpec : subtotalsSpec) {
- int i = 0;
for (String s : subtotalSpec) {
boolean found = false;
- for (; i < dimensions.size(); i++) {
- if (s.equals(dimensions.get(i).getOutputName())) {
+ for (DimensionSpec ds : dimensions) {
+ if (s.equals(ds.getOutputName())) {
found = true;
break;
}
}
if (!found) {
throw new IAE(
- "Subtotal spec %s is either not a subset or items are in
different order than in dimensions.",
+ "Subtotal spec %s is either not a subset of top level
dimensions.",
subtotalSpec
);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
index 49c49ed..13bdd27 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -53,6 +53,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
*
@@ -211,6 +212,15 @@ public class DefaultLimitSpec implements LimitSpec
throw new ISE("Unknown column in order clause[%s]", columnSpec);
}
+ @Override
+ public LimitSpec filterColumns(Set<String> names)
+ {
+ return new DefaultLimitSpec(
+ columns.stream().filter(c ->
names.contains(c.getDimension())).collect(Collectors.toList()),
+ limit
+ );
+ }
+
private Ordering<ResultRow> makeComparator(
Object2IntMap<String> rowOrderLookup,
boolean hasTimestamp,
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java
index 8f0c9b5..fbdd0f1 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/LimitSpec.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import javax.annotation.Nullable;
+import java.util.Set;
/**
*
@@ -53,4 +54,13 @@ public interface LimitSpec extends Cacheable
Function<Sequence<ResultRow>, Sequence<ResultRow>> build(GroupByQuery query);
LimitSpec merge(LimitSpec other);
+
+ /**
+ * Discard sorting columns not contained in given set. This is used when
generating new queries, e.g. to process
+ * subtotal spec in GroupBy query.
+ *
+ * @param names columns names to keep
+ * @return new LimitSpec that works with fitlered set of columns
+ */
+ LimitSpec filterColumns(Set<String> names);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java
index 9bec0cf..84f4cfa 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/orderby/NoopLimitSpec.java
@@ -26,6 +26,8 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
+import java.util.Set;
+
/**
*
*/
@@ -58,6 +60,12 @@ public final class NoopLimitSpec implements LimitSpec
}
@Override
+ public LimitSpec filterColumns(Set<String> names)
+ {
+ return this;
+ }
+
+ @Override
public String toString()
{
return "NoopLimitSpec";
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 4c417b0..34efb62 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -21,9 +21,9 @@ package org.apache.druid.query.groupby.strategy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
@@ -33,7 +33,9 @@ import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
+import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DataSource;
@@ -61,6 +63,8 @@ import
org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import
org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor;
+import org.apache.druid.query.groupby.orderby.LimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.StorageAdapter;
@@ -68,9 +72,12 @@ import org.apache.druid.segment.StorageAdapter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.BinaryOperator;
+import java.util.stream.Collectors;
public class GroupByStrategyV2 implements GroupByStrategy
{
@@ -109,7 +116,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
public GroupByQueryResource prepareResource(GroupByQuery query)
{
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) +
- (query.getSubtotalsSpec() != null ? 1 :
0);
+
numMergeBuffersNeededForSubtotalsSpec(query);
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
@@ -346,35 +353,41 @@ public class GroupByStrategyV2 implements GroupByStrategy
Sequence<ResultRow> queryResult
)
{
- // Note: the approach used here is not always correct; see
https://github.com/apache/incubator-druid/issues/8091.
+ // How it works?
+ // First we accumulate the result of top level base query aka queryResult
arg inside a resultSupplierOne object.
+ // Next for each subtotalSpec
+ // If subtotalSpec is a prefix of top level dims then we iterate on rows
in resultSupplierOne object which are still
+ // sorted by subtotalSpec, stream merge them and return.
+ //
+ // If subtotalSpec is not a prefix of top level dims then we create a
resultSupplierTwo object filled with rows from
+ // resultSupplierOne object with only dims from subtotalSpec. Then we
iterate on rows in resultSupplierTwo object which are
+ // of course sorted by subtotalSpec, stream merge them and return.
// Keep a reference to resultSupplier outside the "try" so we can close it
if something goes wrong
// while creating the sequence.
- GroupByRowProcessor.ResultSupplier resultSupplier = null;
+ GroupByRowProcessor.ResultSupplier resultSupplierOne = null;
try {
- GroupByQuery queryWithoutSubtotalsSpec =
query.withSubtotalsSpec(null).withDimFilter(null);
- List<List<String>> subtotals = query.getSubtotalsSpec();
-
- resultSupplier = GroupByRowProcessor.process(
- queryWithoutSubtotalsSpec
- .withAggregatorSpecs(
- Lists.transform(
- queryWithoutSubtotalsSpec.getAggregatorSpecs(),
- AggregatorFactory::getCombiningFactory
- )
- )
- .withDimensionSpecs(
- Lists.transform(
- queryWithoutSubtotalsSpec.getDimensions(),
- dimSpec ->
- new DefaultDimensionSpec(
- dimSpec.getOutputName(),
- dimSpec.getOutputName(),
- dimSpec.getOutputType()
- )
- )
- ),
+ GroupByQuery queryWithoutSubtotalsSpec = query
+ .withDimensionSpecs(query.getDimensions().stream().map(
+ dimSpec -> new DefaultDimensionSpec(
+ dimSpec.getOutputName(),
+ dimSpec.getOutputName(),
+ dimSpec.getOutputType()
+ )).collect(Collectors.toList())
+ )
+ .withAggregatorSpecs(
+ query.getAggregatorSpecs()
+ .stream()
+ .map(AggregatorFactory::getCombiningFactory)
+ .collect(Collectors.toList())
+ )
+ .withSubtotalsSpec(null)
+ .withDimFilter(null);
+
+
+ resultSupplierOne = GroupByRowProcessor.process(
+ queryWithoutSubtotalsSpec,
queryWithoutSubtotalsSpec,
queryResult,
configSupplier.get(),
@@ -383,8 +396,20 @@ public class GroupByStrategyV2 implements GroupByStrategy
processingConfig.getTmpDir(),
processingConfig.intermediateComputeSizeBytes()
);
+
+ List<String> queryDimNames =
queryWithoutSubtotalsSpec.getDimensions().stream().map(DimensionSpec::getOutputName)
+
.collect(Collectors.toList());
+
+ // Only needed to make LimitSpec.filterColumns(..) call later in case
base query has a non default LimitSpec.
+ Set<String> aggsAndPostAggs = null;
+ if (queryWithoutSubtotalsSpec.getLimitSpec() != null &&
!(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) {
+ aggsAndPostAggs =
getAggregatorAndPostAggregatorNames(queryWithoutSubtotalsSpec);
+ }
+
+ List<List<String>> subtotals = query.getSubtotalsSpec();
List<Sequence<ResultRow>> subtotalsResults = new
ArrayList<>(subtotals.size());
+ // Iterate through each subtotalSpec, build results for it and add to
subtotalsResults
for (List<String> subtotalSpec : subtotals) {
final ImmutableSet<String> dimsInSubtotalSpec =
ImmutableSet.copyOf(subtotalSpec);
final List<DimensionSpec> dimensions = query.getDimensions();
@@ -411,32 +436,130 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
- GroupByQuery subtotalQuery =
queryWithoutSubtotalsSpec.withDimensionSpecs(newDimensions);
-
- final GroupByRowProcessor.ResultSupplier finalResultSupplier =
resultSupplier;
- subtotalsResults.add(
- applyPostProcessing(
- mergeResults(
- (queryPlus, responseContext) ->
finalResultSupplier.results(subtotalSpec),
- subtotalQuery,
- null
- ),
- subtotalQuery
- )
- );
+ // Create appropriate LimitSpec for subtotal query
+ LimitSpec subtotalQueryLimitSpec = NoopLimitSpec.instance();
+ if (queryWithoutSubtotalsSpec.getLimitSpec() != null &&
!(queryWithoutSubtotalsSpec.getLimitSpec() instanceof NoopLimitSpec)) {
+ Set<String> columns = new HashSet(aggsAndPostAggs);
+ columns.addAll(subtotalSpec);
+
+ subtotalQueryLimitSpec =
queryWithoutSubtotalsSpec.getLimitSpec().filterColumns(columns);
+ }
+
+ GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec
+ .withLimitSpec(subtotalQueryLimitSpec)
+ .withDimensionSpecs(newDimensions);
+
+ final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal =
resultSupplierOne;
+ if (Utils.isPrefix(subtotalSpec, queryDimNames)) {
+ // Since subtotalSpec is a prefix of base query dimensions, so
results from base query are also sorted
+ // by subtotalSpec as needed by stream merging.
+ subtotalsResults.add(
+ processSubtotalsResultAndOptionallyClose(() ->
resultSupplierOneFinal, subtotalSpec, subtotalQuery, false)
+ );
+ } else {
+ // Since subtotalSpec is not a prefix of base query dimensions, so
results from base query are not sorted
+ // by subtotalSpec. So we first add the result of base query into
another resultSupplier which are sorted
+ // by subtotalSpec and then stream merge them.
+
+ // Also note, we can't create the ResultSupplier eagerly here or as
we don't want to eagerly allocate
+ // merge buffers for processing subtotal.
+ Supplier<GroupByRowProcessor.ResultSupplier> resultSupplierTwo = ()
-> GroupByRowProcessor.process(
+ queryWithoutSubtotalsSpec,
+ subtotalQuery,
+ resultSupplierOneFinal.results(subtotalSpec),
+ configSupplier.get(),
+ resource,
+ spillMapper,
+ processingConfig.getTmpDir(),
+ processingConfig.intermediateComputeSizeBytes()
+ );
+
+ subtotalsResults.add(
+ processSubtotalsResultAndOptionallyClose(resultSupplierTwo,
subtotalSpec, subtotalQuery, true)
+ );
+ }
}
return Sequences.withBaggage(
Sequences.concat(subtotalsResults),
- resultSupplier
+ resultSupplierOne //this will close resources allocated by
resultSupplierOne after sequence read
);
}
catch (Exception ex) {
- CloseQuietly.close(resultSupplier);
+ CloseQuietly.close(resultSupplierOne);
+ throw ex;
+ }
+ }
+
+ private Sequence<ResultRow> processSubtotalsResultAndOptionallyClose(
+ Supplier<GroupByRowProcessor.ResultSupplier> baseResultsSupplier,
+ List<String> dimsToInclude,
+ GroupByQuery subtotalQuery,
+ boolean closeOnSequenceRead
+ )
+ {
+ // This closes the ResultSupplier in case of any exception here or
arranges for it to be closed
+ // on sequence read if closeOnSequenceRead is true.
+ try {
+ Supplier<GroupByRowProcessor.ResultSupplier> memoizedSupplier =
Suppliers.memoize(baseResultsSupplier);
+ return applyPostProcessing(
+ mergeResults(
+ (queryPlus, responseContext) ->
+ new LazySequence<>(
+ () -> Sequences.withBaggage(
+ memoizedSupplier.get().results(dimsToInclude),
+ closeOnSequenceRead ? () ->
CloseQuietly.close(memoizedSupplier.get()) : () -> {}
+ )
+ ),
+ subtotalQuery,
+ null
+ ),
+ subtotalQuery
+ );
+
+ }
+ catch (Exception ex) {
+ CloseQuietly.close(baseResultsSupplier.get());
throw ex;
}
}
+ private Set<String> getAggregatorAndPostAggregatorNames(GroupByQuery query)
+ {
+ Set<String> aggsAndPostAggs = new HashSet();
+ if (query.getAggregatorSpecs() != null) {
+ for (AggregatorFactory af : query.getAggregatorSpecs()) {
+ aggsAndPostAggs.add(af.getName());
+ }
+ }
+
+ if (query.getPostAggregatorSpecs() != null) {
+ for (PostAggregator pa : query.getPostAggregatorSpecs()) {
+ aggsAndPostAggs.add(pa.getName());
+ }
+ }
+
+ return aggsAndPostAggs;
+ }
+
+ private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
+ {
+ List<List<String>> subtotalSpecs = query.getSubtotalsSpec();
+ if (subtotalSpecs == null || subtotalSpecs.size() == 0) {
+ return 0;
+ }
+
+ List<String> queryDimOutputNames =
query.getDimensions().stream().map(DimensionSpec::getOutputName).collect(
+ Collectors.toList());
+ for (List<String> subtotalSpec : subtotalSpecs) {
+ if (!Utils.isPrefix(subtotalSpec, queryDimOutputNames)) {
+ return 2;
+ }
+ }
+
+ return 1;
+ }
+
@Override
public QueryRunner<ResultRow> mergeRunners(
final ListeningExecutorService exec,
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index af4a207..b233fd8 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -6378,7 +6378,7 @@ public class GroupByQueryRunnerTest
}
@Test
- public void testGroupByWithSubtotalsSpec()
+ public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes()
{
// Cannot vectorize due to usage of expressions.
cannotVectorize();
@@ -6390,29 +6390,143 @@ public class GroupByQueryRunnerTest
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
- .setVirtualColumns(new ExpressionVirtualColumn(
- "alias",
- "quality",
- ValueType.STRING,
- TestExprMacroTable.INSTANCE
+ .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality",
ValueType.STRING, TestExprMacroTable.INSTANCE))
+ .setDimensions(Lists.newArrayList(
+ new DefaultDimensionSpec("market", "market2"),
+ new DefaultDimensionSpec("alias", "alias2")
+ ))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ QueryRunnerTestHelper.rowsCount,
+ new LongSumAggregatorFactory("idx", "index")
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setSubtotalsSpec(ImmutableList.of(
+ ImmutableList.of("market2"),
+ ImmutableList.of()
))
+ .build();
+
+ List<ResultRow> expectedResults = Arrays.asList(
+ makeRow(
+ query,
+ "2011-04-01T00:00:00.000Z",
+ "market2",
+ "spot",
+ "rows",
+ 9L,
+ "idx",
+ 1102L
+ ),
+ makeRow(
+ query,
+ "2011-04-01T00:00:00.000Z",
+ "market2",
+ "total_market",
+ "rows",
+ 2L,
+ "idx",
+ 2836L
+ ),
+ makeRow(
+ query,
+ "2011-04-01T00:00:00.000Z",
+ "market2",
+ "upfront",
+ "rows",
+ 2L,
+ "idx",
+ 2681L
+ ),
+
+ makeRow(
+ query,
+ "2011-04-02T00:00:00.000Z",
+ "market2",
+ "spot",
+ "rows",
+ 9L,
+ "idx",
+ 1120L
+ ),
+ makeRow(
+ query,
+ "2011-04-02T00:00:00.000Z",
+ "market2",
+ "total_market",
+ "rows",
+ 2L,
+ "idx",
+ 2514L
+ ),
+ makeRow(
+ query,
+ "2011-04-02T00:00:00.000Z",
+ "market2",
+ "upfront",
+ "rows",
+ 2L,
+ "idx",
+ 2193L
+ ),
+
+ makeRow(
+ query,
+ "2011-04-01T00:00:00.000Z",
+ "rows",
+ 13L,
+ "idx",
+ 6619L
+ ),
+ makeRow(
+ query,
+ "2011-04-02T00:00:00.000Z",
+ "rows",
+ 13L,
+ "idx",
+ 5827L
+ )
+ );
+
+ Iterable<ResultRow> results =
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "subtotal");
+ }
+
+ @Test
+ public void testGroupByWithSubtotalsSpecGeneral()
+ {
+ // Cannot vectorize due to usage of expressions.
+ cannotVectorize();
+
+ if
(!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+ return;
+ }
+
+ GroupByQuery query = makeQueryBuilder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality",
ValueType.STRING, TestExprMacroTable.INSTANCE))
.setDimensions(Lists.newArrayList(
- new DefaultDimensionSpec("quality", "quality"),
- new DefaultDimensionSpec("market", "market"),
- new DefaultDimensionSpec("alias", "alias")
+ new DefaultDimensionSpec("quality", "quality2"),
+ new DefaultDimensionSpec("market", "market2"),
+ new DefaultDimensionSpec("alias", "alias2")
))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
- new LongSumAggregatorFactory("idx", "index"),
- new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
- new DoubleSumAggregatorFactory("idxDouble", "index")
+ new LongSumAggregatorFactory("idx", "index")
+ )
+ )
+ .setPostAggregatorSpecs(
+ Collections.singletonList(
+ new FieldAccessPostAggregator("idxPostAgg", "idx")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setSubtotalsSpec(ImmutableList.of(
- ImmutableList.of("alias"),
- ImmutableList.of("market"),
+ ImmutableList.of("alias2"),
+ ImmutableList.of("market2"),
ImmutableList.of()
))
.build();
@@ -6421,477 +6535,313 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"automotive",
"rows",
1L,
"idx",
135L,
- "idxFloat",
- 135.88510131835938f,
- "idxDouble",
- 135.88510131835938d
+ "idxPostAgg",
+ 135L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"business",
"rows",
1L,
"idx",
118L,
- "idxFloat",
- 118.57034,
- "idxDouble",
- 118.57034
+ "idxPostAgg",
+ 118L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"entertainment",
"rows",
1L,
"idx",
158L,
- "idxFloat",
- 158.747224,
- "idxDouble",
- 158.747224
+ "idxPostAgg",
+ 158L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"health",
"rows",
1L,
"idx",
120L,
- "idxFloat",
- 120.134704,
- "idxDouble",
- 120.134704
+ "idxPostAgg",
+ 120L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"mezzanine",
"rows",
3L,
"idx",
2870L,
- "idxFloat",
- 2871.8866900000003f,
- "idxDouble",
- 2871.8866900000003d
+ "idxPostAgg",
+ 2870L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"news",
"rows",
1L,
"idx",
121L,
- "idxFloat",
- 121.58358f,
- "idxDouble",
- 121.58358d
+ "idxPostAgg",
+ 121L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"premium",
"rows",
3L,
"idx",
2900L,
- "idxFloat",
- 2900.798647f,
- "idxDouble",
- 2900.798647d
+ "idxPostAgg",
+ 2900L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"technology",
"rows",
1L,
"idx",
78L,
- "idxFloat",
- 78.622547f,
- "idxDouble",
- 78.622547d
+ "idxPostAgg",
+ 78L
),
makeRow(
query,
"2011-04-01",
- "alias",
+ "alias2",
"travel",
"rows",
1L,
"idx",
119L,
- "idxFloat",
- 119.922742f,
- "idxDouble",
- 119.922742d
+ "idxPostAgg",
+ 119L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"automotive",
"rows",
1L,
"idx",
147L,
- "idxFloat",
- 147.42593f,
- "idxDouble",
- 147.42593d
+ "idxPostAgg",
+ 147L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"business",
"rows",
1L,
"idx",
112L,
- "idxFloat",
- 112.987027f,
- "idxDouble",
- 112.987027d
+ "idxPostAgg",
+ 112L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"entertainment",
"rows",
1L,
"idx",
166L,
- "idxFloat",
- 166.016049f,
- "idxDouble",
- 166.016049d
+ "idxPostAgg",
+ 166L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"health",
"rows",
1L,
"idx",
113L,
- "idxFloat",
- 113.446008f,
- "idxDouble",
- 113.446008d
+ "idxPostAgg",
+ 113L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"mezzanine",
"rows",
3L,
"idx",
2447L,
- "idxFloat",
- 2448.830613f,
- "idxDouble",
- 2448.830613d
+ "idxPostAgg",
+ 2447L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"news",
"rows",
1L,
"idx",
114L,
- "idxFloat",
- 114.290141f,
- "idxDouble",
- 114.290141d
+ "idxPostAgg",
+ 114L
),
makeRow(
query,
"2011-04-02",
- "alias",
+ "alias2",
"premium",
"rows",
3L,
"idx",
2505L,
- "idxFloat",
- 2506.415148f,
- "idxDouble",
- 2506.415148d
- ),
- makeRow(
- query,
- "2011-04-02",
- "alias",
- "technology",
- "rows",
- 1L,
- "idx",
- 97L,
- "idxFloat",
- 97.387433f,
- "idxDouble",
- 97.387433d
- ),
- makeRow(
- query,
- "2011-04-02",
- "alias",
- "travel",
- "rows",
- 1L,
- "idx",
- 126L,
- "idxFloat",
- 126.411364f,
- "idxDouble",
- 126.411364d
- ),
-
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 643.043177,
- "idxFloat",
- 643.043212890625,
- "rows",
- 5L,
- "idx",
- 640L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "total_market",
- "idxDouble",
- 1314.839715,
- "idxFloat",
- 1314.8397,
- "rows",
- 1L,
- "idx",
- 1314L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "upfront",
- "idxDouble",
- 1447.34116,
- "idxFloat",
- 1447.3412,
- "rows",
- 1L,
- "idx",
- 1447L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 266.090949,
- "idxFloat",
- 266.0909423828125,
- "rows",
- 2L,
- "idx",
- 265L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "total_market",
- "idxDouble",
- 1522.043733,
- "idxFloat",
- 1522.0437,
- "rows",
- 1L,
- "idx",
- 1522L
+ "idxPostAgg",
+ 2505L
),
makeRow(
query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "upfront",
- "idxDouble",
- 1234.247546,
- "idxFloat",
- 1234.2476,
+ "2011-04-02",
+ "alias2",
+ "technology",
"rows",
1L,
"idx",
- 1234L
+ 97L,
+ "idxPostAgg",
+ 97L
),
makeRow(
query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 198.545289,
- "idxFloat",
- 198.5452880859375,
+ "2011-04-02",
+ "alias2",
+ "travel",
"rows",
- 2L,
+ 1L,
"idx",
- 197L
+ 126L,
+ "idxPostAgg",
+ 126L
),
makeRow(
query,
- "2011-04-02T00:00:00.000Z",
- "market",
+ "2011-04-01T00:00:00.000Z",
+ "market2",
"spot",
- "idxDouble",
- 650.806953,
- "idxFloat",
- 650.8069458007812,
"rows",
- 5L,
+ 9L,
"idx",
- 648L
+ 1102L,
+ "idxPostAgg",
+ 1102L
),
makeRow(
query,
- "2011-04-02T00:00:00.000Z",
- "market",
+ "2011-04-01T00:00:00.000Z",
+ "market2",
"total_market",
- "idxDouble",
- 1193.556278,
- "idxFloat",
- 1193.5563,
"rows",
- 1L,
+ 2L,
"idx",
- 1193L
+ 2836L,
+ "idxPostAgg",
+ 2836L
),
makeRow(
query,
- "2011-04-02T00:00:00.000Z",
- "market",
+ "2011-04-01T00:00:00.000Z",
+ "market2",
"upfront",
- "idxDouble",
- 1144.342401,
- "idxFloat",
- 1144.3424,
"rows",
- 1L,
+ 2L,
"idx",
- 1144L
+ 2681L,
+ "idxPostAgg",
+ 2681L
),
+
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"spot",
- "idxDouble",
- 249.591647,
- "idxFloat",
- 249.59164428710938,
"rows",
- 2L,
+ 9L,
"idx",
- 249L
+ 1120L,
+ "idxPostAgg",
+ 1120L
),
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"total_market",
- "idxDouble",
- 1321.375057,
- "idxFloat",
- 1321.375,
"rows",
- 1L,
+ 2L,
"idx",
- 1321L
+ 2514L,
+ "idxPostAgg",
+ 2514L
),
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"upfront",
- "idxDouble",
- 1049.738585,
- "idxFloat",
- 1049.7385,
- "rows",
- 1L,
- "idx",
- 1049L
- ),
- makeRow(
- query,
- "2011-04-02T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 223.798797,
- "idxFloat",
- 223.79879760742188,
"rows",
2L,
"idx",
- 223L
+ 2193L,
+ "idxPostAgg",
+ 2193L
),
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 6626.151575318359,
- "idxFloat",
- 6626.152f,
"rows",
13L,
"idx",
+ 6619L,
+ "idxPostAgg",
6619L
),
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 5833.209713,
- "idxFloat",
- 5833.209f,
"rows",
13L,
"idx",
+ 5827L,
+ "idxPostAgg",
5827L
)
);
@@ -6904,22 +6854,15 @@ public class GroupByQueryRunnerTest
@Test
public void testGroupByWithSubtotalsSpecWithRenamedDimensionAndFilter()
{
- // Cannot vectorize due to expression virtual columns.
- cannotVectorize();
-
if
(!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
- GroupByQuery query = makeQueryBuilder()
+ GroupByQuery query = GroupByQuery
+ .builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
- .setVirtualColumns(new ExpressionVirtualColumn(
- "alias",
- "quality",
- ValueType.STRING,
- TestExprMacroTable.INSTANCE
- ))
+ .setVirtualColumns(new ExpressionVirtualColumn("alias", "quality",
ValueType.STRING, TestExprMacroTable.INSTANCE))
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("quality", "quality"),
new DefaultDimensionSpec("market", "market"),
@@ -7013,20 +6956,18 @@ public class GroupByQueryRunnerTest
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.newArrayList(
new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG),
- new DefaultDimensionSpec("market", "market")
+ new DefaultDimensionSpec("market", "market2")
))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
- new LongSumAggregatorFactory("idx", "index"),
- new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
- new DoubleSumAggregatorFactory("idxDouble", "index")
+ new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("ql"),
- ImmutableList.of("market"),
+ ImmutableList.of("market2"),
ImmutableList.of()
))
.build();
@@ -7035,10 +6976,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 135.885094,
- "idxFloat",
- 135.8851,
"ql",
1000L,
"rows",
@@ -7049,10 +6986,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 118.57034,
- "idxFloat",
- 118.57034,
"ql",
1100L,
"rows",
@@ -7063,10 +6996,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 158.747224,
- "idxFloat",
- 158.74722,
"ql",
1200L,
"rows",
@@ -7077,10 +7006,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 120.134704,
- "idxFloat",
- 120.134705,
"ql",
1300L,
"rows",
@@ -7091,10 +7016,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 2871.8866900000003,
- "idxFloat",
- 2871.88671875,
"ql",
1400L,
"rows",
@@ -7105,10 +7026,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 121.583581,
- "idxFloat",
- 121.58358,
"ql",
1500L,
"rows",
@@ -7119,10 +7036,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 2900.798647,
- "idxFloat",
- 2900.798583984375,
"ql",
1600L,
"rows",
@@ -7133,10 +7046,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 78.622547,
- "idxFloat",
- 78.62254,
"ql",
1700L,
"rows",
@@ -7147,10 +7056,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 119.922742,
- "idxFloat",
- 119.922745,
"ql",
1800L,
"rows",
@@ -7161,10 +7066,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 147.425935,
- "idxFloat",
- 147.42593,
"ql",
1000L,
"rows",
@@ -7175,10 +7076,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 112.987027,
- "idxFloat",
- 112.98703,
"ql",
1100L,
"rows",
@@ -7189,10 +7086,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 166.016049,
- "idxFloat",
- 166.01605,
"ql",
1200L,
"rows",
@@ -7203,10 +7096,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 113.446008,
- "idxFloat",
- 113.44601,
"ql",
1300L,
"rows",
@@ -7217,10 +7106,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 2448.830613,
- "idxFloat",
- 2448.83056640625,
"ql",
1400L,
"rows",
@@ -7231,10 +7116,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 114.290141,
- "idxFloat",
- 114.29014,
"ql",
1500L,
"rows",
@@ -7245,10 +7126,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 2506.415148,
- "idxFloat",
- 2506.4150390625,
"ql",
1600L,
"rows",
@@ -7259,10 +7136,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 97.387433,
- "idxFloat",
- 97.387436,
"ql",
1700L,
"rows",
@@ -7273,10 +7146,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 126.411364,
- "idxFloat",
- 126.41136,
"ql",
1800L,
"rows",
@@ -7288,207 +7157,69 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "market",
+ "market2",
"spot",
- "idxDouble",
- 643.043177,
- "idxFloat",
- 643.043212890625,
"rows",
- 5L,
+ 9L,
"idx",
- 640L
+ 1102L
),
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "market",
+ "market2",
"total_market",
- "idxDouble",
- 1314.839715,
- "idxFloat",
- 1314.8397,
- "rows",
- 1L,
- "idx",
- 1314L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "upfront",
- "idxDouble",
- 1447.34116,
- "idxFloat",
- 1447.3412,
- "rows",
- 1L,
- "idx",
- 1447L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 266.090949,
- "idxFloat",
- 266.0909423828125,
"rows",
2L,
"idx",
- 265L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "total_market",
- "idxDouble",
- 1522.043733,
- "idxFloat",
- 1522.0437,
- "rows",
- 1L,
- "idx",
- 1522L
+ 2836L
),
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "market",
+ "market2",
"upfront",
- "idxDouble",
- 1234.247546,
- "idxFloat",
- 1234.2476,
- "rows",
- 1L,
- "idx",
- 1234L
- ),
- makeRow(
- query,
- "2011-04-01T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 198.545289,
- "idxFloat",
- 198.5452880859375,
"rows",
2L,
"idx",
- 197L
+ 2681L
),
+
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"spot",
- "idxDouble",
- 650.806953,
- "idxFloat",
- 650.8069458007812,
"rows",
- 5L,
+ 9L,
"idx",
- 648L
+ 1120L
),
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"total_market",
- "idxDouble",
- 1193.556278,
- "idxFloat",
- 1193.5563,
- "rows",
- 1L,
- "idx",
- 1193L
- ),
- makeRow(
- query,
- "2011-04-02T00:00:00.000Z",
- "market",
- "upfront",
- "idxDouble",
- 1144.342401,
- "idxFloat",
- 1144.3424,
- "rows",
- 1L,
- "idx",
- 1144L
- ),
- makeRow(
- query,
- "2011-04-02T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 249.591647,
- "idxFloat",
- 249.59164428710938,
"rows",
2L,
"idx",
- 249L
- ),
- makeRow(
- query,
- "2011-04-02T00:00:00.000Z",
- "market",
- "total_market",
- "idxDouble",
- 1321.375057,
- "idxFloat",
- 1321.375,
- "rows",
- 1L,
- "idx",
- 1321L
+ 2514L
),
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "market",
+ "market2",
"upfront",
- "idxDouble",
- 1049.738585,
- "idxFloat",
- 1049.7385,
- "rows",
- 1L,
- "idx",
- 1049L
- ),
- makeRow(
- query,
- "2011-04-02T00:00:00.000Z",
- "market",
- "spot",
- "idxDouble",
- 223.798797,
- "idxFloat",
- 223.79879760742188,
"rows",
2L,
"idx",
- 223L
+ 2193L
),
+
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 6626.151569,
- "idxFloat",
- 6626.1513671875,
"rows",
13L,
"idx",
@@ -7497,10 +7228,6 @@ public class GroupByQueryRunnerTest
makeRow(
query,
"2011-04-02T00:00:00.000Z",
- "idxDouble",
- 5833.209717999999,
- "idxFloat",
- 5833.20849609375,
"rows",
13L,
"idx",
@@ -7529,9 +7256,7 @@ public class GroupByQueryRunnerTest
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
- new LongSumAggregatorFactory("idx", "index"),
- new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
- new DoubleSumAggregatorFactory("idxDouble", "index")
+ new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
@@ -7540,7 +7265,9 @@ public class GroupByQueryRunnerTest
ImmutableList.of("market"),
ImmutableList.of()
))
- .addOrderByColumn("idxDouble")
+ .addOrderByColumn("idx")
+ .addOrderByColumn("alias")
+ .addOrderByColumn("market")
.setLimit(1)
.build();
@@ -7553,33 +7280,21 @@ public class GroupByQueryRunnerTest
"rows",
1L,
"idx",
- 78L,
- "idxFloat",
- 78.622547f,
- "idxDouble",
- 78.622547d
+ 78L
),
makeRow(
query,
"2011-04-01T00:00:00.000Z",
"market",
"spot",
- "idxDouble",
- 198.545289,
- "idxFloat",
- 198.5452880859375,
"rows",
- 2L,
+ 9L,
"idx",
- 197L
+ 1102L
),
makeRow(
query,
"2011-04-01T00:00:00.000Z",
- "idxDouble",
- 6626.151575318359,
- "idxFloat",
- 6626.152f,
"rows",
13L,
"idx",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]