clintropolis commented on code in PR #13554: URL: https://github.com/apache/druid/pull/13554#discussion_r1063030909
########## processing/src/test/java/org/apache/druid/query/UnnestTopNQueryRunnerTest.java: ########## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; Review Comment: nit: if this was in `org.apache.druid.query.topn` with all the other topn query tests then wouldn't need change method visibility in `PooledTopNAlgorithm` ########## processing/src/test/java/org/apache/druid/query/UnnestScanQueryRunnerTest.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; Review Comment: nit: this should live in `org.apache.druid.query.scan` ########## processing/src/test/java/org/apache/druid/query/UnnestScanQueryRunnerTest.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.scan.ScanQueryRunnerTest; +import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final QuerySegmentSpec I_0112_0114 = ScanQueryRunnerTest.I_0112_0114; + private static final VirtualColumn EXPR_COLUMN = + new ExpressionVirtualColumn("expr", "index * 2", ColumnType.LONG, TestExprMacroTable.INSTANCE); + private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest( + new ScanQueryConfig(), + DefaultGenericQueryMetricsFactory.instance() + ); + private static final ScanQueryRunnerFactory FACTORY = new ScanQueryRunnerFactory( + TOOL_CHEST, + new ScanQueryEngine(), + new ScanQueryConfig() + ); + private final QueryRunner runner; + private final boolean legacy; + + public UnnestScanQueryRunnerTest(final QueryRunner runner, final boolean legacy) + { + this.runner = runner; + this.legacy = legacy; + } + + @Parameterized.Parameters(name = "{0}, legacy = {1}") + public static Iterable<Object[]> constructorFeeder() + { + + return QueryRunnerTestHelper.cartesian( + QueryRunnerTestHelper.makeUnnestQueryRunners( + FACTORY, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ), + ImmutableList.of(false, true) + ); + } + + private Druids.ScanQueryBuilder newTestUnnestQuery() + { + return Druids.newScanQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + private Druids.ScanQueryBuilder newTestUnnestQueryWithAllowSet() + { + List<String> allowList = Arrays.asList("a", "b", "c"); + LinkedHashSet allowSet = new LinkedHashSet(allowList); + return Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + allowSet + )) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + @Test + public void testUnnestRunner() Review Comment: nit: `testScan`? ########## processing/src/test/java/org/apache/druid/query/UnnestGroupByQueryRunnerTest.java: ########## @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; Review Comment: nit: imo this should live in `org.apache.druid.query.groupby` ########## processing/src/test/java/org/apache/druid/query/UnnestScanQueryRunnerTest.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.scan.ScanQueryRunnerTest; +import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final QuerySegmentSpec I_0112_0114 = ScanQueryRunnerTest.I_0112_0114; + private static final VirtualColumn EXPR_COLUMN = + new ExpressionVirtualColumn("expr", "index * 2", ColumnType.LONG, TestExprMacroTable.INSTANCE); + private static final ScanQueryQueryToolChest TOOL_CHEST = new ScanQueryQueryToolChest( + new ScanQueryConfig(), + DefaultGenericQueryMetricsFactory.instance() + ); + private static final ScanQueryRunnerFactory FACTORY = new ScanQueryRunnerFactory( + TOOL_CHEST, + new ScanQueryEngine(), + new ScanQueryConfig() + ); + private final QueryRunner runner; + private final boolean legacy; + + public UnnestScanQueryRunnerTest(final QueryRunner runner, final boolean legacy) + { + this.runner = runner; + this.legacy = legacy; + } + + @Parameterized.Parameters(name = "{0}, legacy = {1}") + public static Iterable<Object[]> constructorFeeder() + { + + return QueryRunnerTestHelper.cartesian( + QueryRunnerTestHelper.makeUnnestQueryRunners( + FACTORY, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ), + ImmutableList.of(false, true) + ); + } + + private Druids.ScanQueryBuilder newTestUnnestQuery() + { + return Druids.newScanQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + private Druids.ScanQueryBuilder newTestUnnestQueryWithAllowSet() + { + List<String> allowList = Arrays.asList("a", "b", "c"); + LinkedHashSet allowSet = new LinkedHashSet(allowList); + return Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + allowSet + )) + .columns(Collections.emptyList()) + .eternityInterval() + .limit(3) + .legacy(legacy); + } + + @Test + public void testUnnestRunner() + { + ScanQuery query = newTestUnnestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .limit(3) + .build(); + + Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList(); + String[] columnNames; + if (legacy) { + columnNames = new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } else { + columnNames = new String[]{ + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST + }; + } + String[] values; + if (legacy) { + values = new String[]{ + "2011-01-12T00:00:00.000Z\ta", + "2011-01-12T00:00:00.000Z\tpreferred", + "2011-01-12T00:00:00.000Z\tb" + }; + } else { + values = new String[]{ + "a", + "preferred", + "b" + }; + } + + final List<List<Map<String, Object>>> events = toEvents(columnNames, values); + List<ScanResultValue> expectedResults = toExpected( + events, + legacy + ? Lists.newArrayList(getTimestampName(), QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + : Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST), + 0, + 3 + ); + ScanQueryRunnerTest.verify(expectedResults, results); + } + + @Test + public void testUnnestRunnerVirtualColumns() Review Comment: i think would also be interesting to have a test on a composite array column using `array` function to create an array from multiple columns (in addition to `mv_to_array` test) ########## processing/src/test/java/org/apache/druid/query/UnnestGroupByQueryRunnerTest.java: ########## @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.extraction.StringFormatExtractionFn; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryEngine; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; +import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper(); + public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 10 * 1024 * 1024; + } + + @Override + public int getNumMergeBuffers() + { + // Some tests need two buffers for testing nested groupBy (simulating two levels of merging). + // Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec). + return 4; + } + + @Override + public int getNumThreads() + { + return 2; + } + }; + + private static TestGroupByBuffers BUFFER_POOLS = null; + + private final QueryRunner<ResultRow> runner; + private final QueryRunner<ResultRow> originalRunner; + private final GroupByQueryRunnerFactory factory; + private final GroupByQueryConfig config; + private final boolean vectorize; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public UnnestGroupByQueryRunnerTest( + String testName, + GroupByQueryConfig config, + GroupByQueryRunnerFactory factory, + QueryRunner runner, + boolean vectorize + ) + { + this.config = config; + this.factory = factory; + this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); + this.originalRunner = runner; + String runnerName = runner.toString(); + this.vectorize = vectorize; + } + + public static List<GroupByQueryConfig> testConfigs() + { + + final GroupByQueryConfig v2Config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + // Small initial table to force some growing. + return 4; + } + + @Override + public String toString() + { + return "v2"; + } + }; + final GroupByQueryConfig v2SmallBufferConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperMaxSize() + { + return 2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallBuffer"; + } + }; + final GroupByQueryConfig v2SmallDictionaryConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallDictionary"; + } + }; + final GroupByQueryConfig v2ParallelCombineConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getNumParallelCombineThreads() + { + return DEFAULT_PROCESSING_CONFIG.getNumThreads(); + } + + @Override + public String toString() + { + return "v2ParallelCombine"; + } + }; + + + return ImmutableList.of( + v2Config, + v2SmallBufferConfig, + v2SmallDictionaryConfig, + v2ParallelCombineConfig + ); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(DEFAULT_MAPPER, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(mapper, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools, + final DruidProcessingConfig processingConfig + ) + { + if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) { + throw new ISE( + "Provided buffer size [%,d] does not match configured size [%,d]", + bufferPools.getBufferSize(), + processingConfig.intermediateComputeSizeBytes() + ); + } + if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) { + throw new ISE( + "Provided merge buffer count [%,d] does not match configured count [%,d]", + bufferPools.getNumMergeBuffers(), + processingConfig.getNumMergeBuffers() + ); + } + final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + configSupplier, + bufferPools.getProcessingPool(), + bufferPools.getMergePool(), + TestHelper.makeJsonMapper(), + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); + return new GroupByQueryRunnerFactory(strategySelector, toolChest); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> constructorFeeder() + { + NullHandling.initializeForTests(); + setUpClass(); + + final List<Object[]> constructors = new ArrayList<>(); + for (GroupByQueryConfig config : testConfigs()) { + final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS); + for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeUnnestQueryRunners( + factory, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) { + for (boolean vectorize : ImmutableList.of(false)) { + final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); + + // Add vectorization tests for any indexes that support it. + if (!vectorize || + (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) && + config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) { + constructors.add(new Object[]{testName, config, factory, runner, vectorize}); + } + } + } + } + + return constructors; + } + + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static ResultRow makeRow(final GroupByQuery query, final DateTime timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static List<ResultRow> makeRows( + final GroupByQuery query, + final String[] columnNames, + final Object[]... values + ) + { + return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, values); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index") + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + List<ResultRow> expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias", + "automotive", + "rows", + 2L, + "idx", + 270L + ), + makeRow( + query, + "2011-04-01", + "alias", + "business", + "rows", + 2L, + "idx", + 236L + ), + makeRow( + query, + "2011-04-01", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 316L + ), + makeRow( + query, + "2011-04-01", + "alias", + "health", + "rows", + 2L, + "idx", + 240L + ), + makeRow( + query, + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 5740L + ), + makeRow( + query, + "2011-04-01", + "alias", + "news", + "rows", + 2L, + "idx", + 242L + ), + makeRow( + query, + "2011-04-01", + "alias", + "premium", + "rows", + 6L, + "idx", + 5800L + ), + makeRow( + query, + "2011-04-01", + "alias", + "technology", + "rows", + 2L, + "idx", + 156L + ), + makeRow( + query, + "2011-04-01", + "alias", + "travel", + "rows", + 2L, + "idx", + 238L + ), + + makeRow( + query, + "2011-04-02", + "alias", + "automotive", + "rows", + 2L, + "idx", + 294L + ), + makeRow( + query, + "2011-04-02", + "alias", + "business", + "rows", + 2L, + "idx", + 224L + ), + makeRow( + query, + "2011-04-02", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 332L + ), + makeRow( + query, + "2011-04-02", + "alias", + "health", + "rows", + 2L, + "idx", + 226L + ), + makeRow( + query, + "2011-04-02", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4894L + ), + makeRow( + query, + "2011-04-02", + "alias", + "news", + "rows", + 2L, + "idx", + 228L + ), + makeRow( + query, + "2011-04-02", + "alias", + "premium", + "rows", + 6L, + "idx", + 5010L + ), + makeRow( + query, + "2011-04-02", + "alias", + "technology", + "rows", + 2L, + "idx", + 194L + ), + makeRow( + query, + "2011-04-02", + "alias", + "travel", + "rows", + 2L, + "idx", + 252L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + @Test + public void testGroupByOnMissingColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec("nonexistent0", "alias0"), + new ExtractionDimensionSpec("nonexistent1", "alias1", new StringFormatExtractionFn("foo")) + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List<ResultRow> expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "alias0", null, + "alias1", "foo", + "rows", 52L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "missing-column"); + } + + @Test + public void testGroupByOnUnnestedColumn() + { + // Cannot vectorize due to extraction dimension spec. Review Comment: nit: this comment isn't true ########## processing/src/test/java/org/apache/druid/query/UnnestGroupByQueryRunnerTest.java: ########## @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.extraction.StringFormatExtractionFn; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryEngine; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; +import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper(); + public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() Review Comment: since these and some other stuff are public in the standard `GroupByQueryRunnerTest` I wonder if we should just use those instead of redefining here ########## processing/src/test/java/org/apache/druid/query/UnnestGroupByQueryRunnerTest.java: ########## @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.extraction.StringFormatExtractionFn; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryEngine; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; +import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper(); + public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 10 * 1024 * 1024; + } + + @Override + public int getNumMergeBuffers() + { + // Some tests need two buffers for testing nested groupBy (simulating two levels of merging). + // Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec). + return 4; + } + + @Override + public int getNumThreads() + { + return 2; + } + }; + + private static TestGroupByBuffers BUFFER_POOLS = null; + + private final QueryRunner<ResultRow> runner; + private final QueryRunner<ResultRow> originalRunner; + private final GroupByQueryRunnerFactory factory; + private final GroupByQueryConfig config; + private final boolean vectorize; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public UnnestGroupByQueryRunnerTest( + String testName, + GroupByQueryConfig config, + GroupByQueryRunnerFactory factory, + QueryRunner runner, + boolean vectorize + ) + { + this.config = config; + this.factory = factory; + this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); + this.originalRunner = runner; + String runnerName = runner.toString(); + this.vectorize = vectorize; + } + + public static List<GroupByQueryConfig> testConfigs() + { + + final GroupByQueryConfig v2Config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + // Small initial table to force some growing. + return 4; + } + + @Override + public String toString() + { + return "v2"; + } + }; + final GroupByQueryConfig v2SmallBufferConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperMaxSize() + { + return 2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallBuffer"; + } + }; + final GroupByQueryConfig v2SmallDictionaryConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallDictionary"; + } + }; + final GroupByQueryConfig v2ParallelCombineConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getNumParallelCombineThreads() + { + return DEFAULT_PROCESSING_CONFIG.getNumThreads(); + } + + @Override + public String toString() + { + return "v2ParallelCombine"; + } + }; + + + return ImmutableList.of( + v2Config, + v2SmallBufferConfig, + v2SmallDictionaryConfig, + v2ParallelCombineConfig + ); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(DEFAULT_MAPPER, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(mapper, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools, + final DruidProcessingConfig processingConfig + ) + { + if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) { + throw new ISE( + "Provided buffer size [%,d] does not match configured size [%,d]", + bufferPools.getBufferSize(), + processingConfig.intermediateComputeSizeBytes() + ); + } + if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) { + throw new ISE( + "Provided merge buffer count [%,d] does not match configured count [%,d]", + bufferPools.getNumMergeBuffers(), + processingConfig.getNumMergeBuffers() + ); + } + final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + configSupplier, + bufferPools.getProcessingPool(), + bufferPools.getMergePool(), + TestHelper.makeJsonMapper(), + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); + return new GroupByQueryRunnerFactory(strategySelector, toolChest); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> constructorFeeder() + { + NullHandling.initializeForTests(); + setUpClass(); + + final List<Object[]> constructors = new ArrayList<>(); + for (GroupByQueryConfig config : testConfigs()) { + final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS); + for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeUnnestQueryRunners( + factory, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) { + for (boolean vectorize : ImmutableList.of(false)) { + final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); + + // Add vectorization tests for any indexes that support it. + if (!vectorize || + (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) && + config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) { + constructors.add(new Object[]{testName, config, factory, runner, vectorize}); + } + } + } + } + + return constructors; + } + + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static ResultRow makeRow(final GroupByQuery query, final DateTime timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static List<ResultRow> makeRows( + final GroupByQuery query, + final String[] columnNames, + final Object[]... values + ) + { + return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, values); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index") + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + List<ResultRow> expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias", + "automotive", + "rows", + 2L, + "idx", + 270L + ), + makeRow( + query, + "2011-04-01", + "alias", + "business", + "rows", + 2L, + "idx", + 236L + ), + makeRow( + query, + "2011-04-01", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 316L + ), + makeRow( + query, + "2011-04-01", + "alias", + "health", + "rows", + 2L, + "idx", + 240L + ), + makeRow( + query, + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 5740L + ), + makeRow( + query, + "2011-04-01", + "alias", + "news", + "rows", + 2L, + "idx", + 242L + ), + makeRow( + query, + "2011-04-01", + "alias", + "premium", + "rows", + 6L, + "idx", + 5800L + ), + makeRow( + query, + "2011-04-01", + "alias", + "technology", + "rows", + 2L, + "idx", + 156L + ), + makeRow( + query, + "2011-04-01", + "alias", + "travel", + "rows", + 2L, + "idx", + 238L + ), + + makeRow( + query, + "2011-04-02", + "alias", + "automotive", + "rows", + 2L, + "idx", + 294L + ), + makeRow( + query, + "2011-04-02", + "alias", + "business", + "rows", + 2L, + "idx", + 224L + ), + makeRow( + query, + "2011-04-02", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 332L + ), + makeRow( + query, + "2011-04-02", + "alias", + "health", + "rows", + 2L, + "idx", + 226L + ), + makeRow( + query, + "2011-04-02", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4894L + ), + makeRow( + query, + "2011-04-02", + "alias", + "news", + "rows", + 2L, + "idx", + 228L + ), + makeRow( + query, + "2011-04-02", + "alias", + "premium", + "rows", + 6L, + "idx", + 5010L + ), + makeRow( + query, + "2011-04-02", + "alias", + "technology", + "rows", + 2L, + "idx", + 194L + ), + makeRow( + query, + "2011-04-02", + "alias", + "travel", + "rows", + 2L, + "idx", + 252L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + @Test + public void testGroupByOnMissingColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec("nonexistent0", "alias0"), + new ExtractionDimensionSpec("nonexistent1", "alias1", new StringFormatExtractionFn("foo")) + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List<ResultRow> expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "alias0", null, + "alias1", "foo", + "rows", 52L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "missing-column"); + } + + @Test + public void testGroupByOnUnnestedColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0") + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + // Total rows should add up to 26 * 2 = 52 + // 26 rows and each has 2 entries in the column to be unnested + List<ResultRow> expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias0", "a", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "b", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "e", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "h", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "m", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "n", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "p", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "preferred", + "rows", 26L + ), + makeRow( + query, + "2011-04-01", + "alias0", "t", + "rows", 4L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "hroupBy-on-unnested-column"); + } + Review Comment: would be nice to also have a test with a virtual column, maybe both `mv_to_array` and making a composite array from multiple columns with `array` would be interesting ########## processing/src/test/java/org/apache/druid/query/UnnestTopNQueryRunnerTest.java: ########## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.topn.DimensionTopNMetricSpec; +import org.apache.druid.query.topn.PooledTopNAlgorithm; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.topn.TopNQueryConfig; +import org.apache.druid.query.topn.TopNQueryQueryToolChest; +import org.apache.druid.query.topn.TopNQueryRunnerFactory; +import org.apache.druid.query.topn.TopNResultValue; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * + */ +@RunWith(Parameterized.class) +public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest +{ + private static final Closer RESOURCE_CLOSER = Closer.create(); + private final QueryRunner<Result<TopNResultValue>> runner; + private final boolean duplicateSingleAggregatorQueries; + private final List<AggregatorFactory> commonAggregators; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public UnnestTopNQueryRunnerTest( + QueryRunner<Result<TopNResultValue>> runner, + boolean specializeGeneric1AggPooledTopN, + boolean specializeGeneric2AggPooledTopN, + boolean specializeHistorical1SimpleDoubleAggPooledTopN, + boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN, + boolean duplicateSingleAggregatorQueries, + List<AggregatorFactory> commonAggregators + ) + { + this.runner = runner; + PooledTopNAlgorithm.setSpecializeGeneric1AggPooledTopN(specializeGeneric1AggPooledTopN); + PooledTopNAlgorithm.setSpecializeGeneric2AggPooledTopN(specializeGeneric2AggPooledTopN); + PooledTopNAlgorithm.setSpecializeHistorical1SimpleDoubleAggPooledTopN( + specializeHistorical1SimpleDoubleAggPooledTopN + ); + PooledTopNAlgorithm.setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN( + specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN + ); + this.duplicateSingleAggregatorQueries = duplicateSingleAggregatorQueries; + this.commonAggregators = commonAggregators; + } + + @AfterClass + public static void teardown() throws IOException + { + RESOURCE_CLOSER.close(); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + List<QueryRunner<Result<TopNResultValue>>> retVal = queryRunners(); + List<Object[]> parameters = new ArrayList<>(); + for (int i = 0; i < 32; i++) { + for (QueryRunner<Result<TopNResultValue>> firstParameter : retVal) { + Object[] params = new Object[7]; + params[0] = firstParameter; + params[1] = (i & 1) != 0; + params[2] = (i & 2) != 0; + params[3] = (i & 4) != 0; + params[4] = (i & 8) != 0; + params[5] = (i & 16) != 0; + params[6] = QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS; + Object[] params2 = Arrays.copyOf(params, 7); + params2[6] = QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS; + parameters.add(params); + parameters.add(params2); + } + } + return parameters; + } + + public static List<QueryRunner<Result<TopNResultValue>>> queryRunners() + { + final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool(); + final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>( + "TopNQueryRunnerFactory-bufferPool", + () -> ByteBuffer.allocate(20000) + ); + + List<QueryRunner<Result<TopNResultValue>>> retVal = new ArrayList<>(Collections.singletonList( + QueryRunnerTestHelper.makeUnnestQueryRunners( + new TopNQueryRunnerFactory( + defaultPool, + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + ).get(0))); + + RESOURCE_CLOSER.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals("defaultPool objects created", defaultPool.poolSize(), defaultPool.objectsCreatedCount()); + Assert.assertEquals("customPool objects created", customPool.poolSize(), customPool.objectsCreatedCount()); + defaultPool.close(); + customPool.close(); + }); + + return retVal; + } + + private static Map<String, Object> makeRowWithNulls( + String dimName, + @Nullable Object dimValue, + String metric, + @Nullable Object metricVal + ) + { + Map<String, Object> nullRow = new HashMap<>(); + nullRow.put(dimName, dimValue); + nullRow.put(metric, metricVal); + return nullRow; + } + + private List<AggregatorFactory> duplicateAggregators(AggregatorFactory aggregatorFactory, AggregatorFactory duplicate) + { + if (duplicateSingleAggregatorQueries) { + return ImmutableList.of(aggregatorFactory, duplicate); + } else { + return Collections.singletonList(aggregatorFactory); + } + } + + private List<Map<String, Object>> withDuplicateResults( + List<? extends Map<String, Object>> results, + String key, + String duplicateKey + ) + { + if (!duplicateSingleAggregatorQueries) { + return (List<Map<String, Object>>) results; + } + List<Map<String, Object>> resultsWithDuplicates = new ArrayList<>(); + for (Map<String, Object> result : results) { + resultsWithDuplicates.add( + ImmutableMap.<String, Object>builder().putAll(result).put(duplicateKey, result.get(key)).build() + ); + } + return resultsWithDuplicates; + } + + private Sequence<Result<TopNResultValue>> assertExpectedResults( + Iterable<Result<TopNResultValue>> expectedResults, + TopNQuery query + ) + { + final Sequence<Result<TopNResultValue>> retval = runWithMerge(query); + TestHelper.assertExpectedResults(expectedResults, retval); + return retval; + } + + private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query) + { + return runWithMerge(query, ResponseContext.createEmpty()); + } + + private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, ResponseContext context) + { + final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig()); + final QueryRunner<Result<TopNResultValue>> mergeRunner = new FinalizeResultsQueryRunner( + chest.mergeResults(runner), + chest + ); + return mergeRunner.run(QueryPlus.wrap(query), context); + } + + @Test + public void testEmptyTopN() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(4) + .intervals(QueryRunnerTestHelper.EMPTY_INTERVAL) + .aggregators( + Lists.newArrayList( + Iterables.concat( + commonAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index"), + new DoubleFirstAggregatorFactory("first", "index", null) + ) + ) + ) + ) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + List<Result<TopNResultValue>> expectedResults = ImmutableList.of( + new Result<>( + DateTimes.of("2020-04-02T00:00:00.000Z"), + new TopNResultValue(ImmutableList.of()) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopNLexicographicUnnest() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST) + .metric(new DimensionTopNMetricSpec("", StringComparators.LEXICOGRAPHIC)) + .threshold(4) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(commonAggregators) + .postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT) + .build(); + + List<Result<TopNResultValue>> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.<Map<String, Object>>asList( + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "a", + "rows", 2L, + "index", 283.311029D, + "addRowsIndexConstant", 286.311029D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "b", + "rows", 2L, + "index", 231.557367D, + "addRowsIndexConstant", 234.557367D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "e", + "rows", 2L, + "index", 324.763273D, + "addRowsIndexConstant", 327.763273D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ), + ImmutableMap.of( + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "h", + "rows", 2L, + "index", 233.580712D, + "addRowsIndexConstant", 236.580712D, + "uniques", QueryRunnerTestHelper.UNIQUES_1 + ) + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopNStringVirtualColumnUnnest() Review Comment: similar comment about virtual column created from multiple columns using `array` in addition to `mv_to_array` test ########## processing/src/test/java/org/apache/druid/query/UnnestGroupByQueryRunnerTest.java: ########## @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.extraction.StringFormatExtractionFn; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryEngine; +import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; +import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; +import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; +import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.joda.time.DateTime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest +{ + public static final ObjectMapper DEFAULT_MAPPER = TestHelper.makeSmileMapper(); + public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + + @Override + public int intermediateComputeSizeBytes() + { + return 10 * 1024 * 1024; + } + + @Override + public int getNumMergeBuffers() + { + // Some tests need two buffers for testing nested groupBy (simulating two levels of merging). + // Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec). + return 4; + } + + @Override + public int getNumThreads() + { + return 2; + } + }; + + private static TestGroupByBuffers BUFFER_POOLS = null; + + private final QueryRunner<ResultRow> runner; + private final QueryRunner<ResultRow> originalRunner; + private final GroupByQueryRunnerFactory factory; + private final GroupByQueryConfig config; + private final boolean vectorize; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public UnnestGroupByQueryRunnerTest( + String testName, + GroupByQueryConfig config, + GroupByQueryRunnerFactory factory, + QueryRunner runner, + boolean vectorize + ) + { + this.config = config; + this.factory = factory; + this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); + this.originalRunner = runner; + String runnerName = runner.toString(); + this.vectorize = vectorize; + } + + public static List<GroupByQueryConfig> testConfigs() + { + + final GroupByQueryConfig v2Config = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperInitialBuckets() + { + // Small initial table to force some growing. + return 4; + } + + @Override + public String toString() + { + return "v2"; + } + }; + final GroupByQueryConfig v2SmallBufferConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getBufferGrouperMaxSize() + { + return 2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallBuffer"; + } + }; + final GroupByQueryConfig v2SmallDictionaryConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public HumanReadableBytes getMaxOnDiskStorage() + { + return HumanReadableBytes.valueOf(10L * 1024 * 1024); + } + + @Override + public String toString() + { + return "v2SmallDictionary"; + } + }; + final GroupByQueryConfig v2ParallelCombineConfig = new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return GroupByStrategySelector.STRATEGY_V2; + } + + @Override + public int getNumParallelCombineThreads() + { + return DEFAULT_PROCESSING_CONFIG.getNumThreads(); + } + + @Override + public String toString() + { + return "v2ParallelCombine"; + } + }; + + + return ImmutableList.of( + v2Config, + v2SmallBufferConfig, + v2SmallDictionaryConfig, + v2ParallelCombineConfig + ); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(DEFAULT_MAPPER, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools + ) + { + return makeQueryRunnerFactory(mapper, config, bufferPools, DEFAULT_PROCESSING_CONFIG); + } + + public static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config, + final TestGroupByBuffers bufferPools, + final DruidProcessingConfig processingConfig + ) + { + if (bufferPools.getBufferSize() != processingConfig.intermediateComputeSizeBytes()) { + throw new ISE( + "Provided buffer size [%,d] does not match configured size [%,d]", + bufferPools.getBufferSize(), + processingConfig.intermediateComputeSizeBytes() + ); + } + if (bufferPools.getNumMergeBuffers() != processingConfig.getNumMergeBuffers()) { + throw new ISE( + "Provided merge buffer count [%,d] does not match configured count [%,d]", + bufferPools.getNumMergeBuffers(), + processingConfig.getNumMergeBuffers() + ); + } + final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPools.getProcessingPool()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new GroupByStrategyV2( + processingConfig, + configSupplier, + bufferPools.getProcessingPool(), + bufferPools.getMergePool(), + TestHelper.makeJsonMapper(), + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); + return new GroupByQueryRunnerFactory(strategySelector, toolChest); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> constructorFeeder() + { + NullHandling.initializeForTests(); + setUpClass(); + + final List<Object[]> constructors = new ArrayList<>(); + for (GroupByQueryConfig config : testConfigs()) { + final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS); + for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeUnnestQueryRunners( + factory, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) { + for (boolean vectorize : ImmutableList.of(false)) { + final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); + + // Add vectorization tests for any indexes that support it. + if (!vectorize || + (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) && + config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) { + constructors.add(new Object[]{testName, config, factory, runner, vectorize}); + } + } + } + } + + return constructors; + } + + @BeforeClass + public static void setUpClass() + { + if (BUFFER_POOLS == null) { + BUFFER_POOLS = TestGroupByBuffers.createDefault(); + } + } + + @AfterClass + public static void tearDownClass() + { + BUFFER_POOLS.close(); + BUFFER_POOLS = null; + } + + private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static ResultRow makeRow(final GroupByQuery query, final DateTime timestamp, final Object... vals) + { + return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals); + } + + private static List<ResultRow> makeRows( + final GroupByQuery query, + final String[] columnNames, + final Object[]... values + ) + { + return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, values); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new LongSumAggregatorFactory("idx", "index") + ) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + List<ResultRow> expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias", + "automotive", + "rows", + 2L, + "idx", + 270L + ), + makeRow( + query, + "2011-04-01", + "alias", + "business", + "rows", + 2L, + "idx", + 236L + ), + makeRow( + query, + "2011-04-01", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 316L + ), + makeRow( + query, + "2011-04-01", + "alias", + "health", + "rows", + 2L, + "idx", + 240L + ), + makeRow( + query, + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 5740L + ), + makeRow( + query, + "2011-04-01", + "alias", + "news", + "rows", + 2L, + "idx", + 242L + ), + makeRow( + query, + "2011-04-01", + "alias", + "premium", + "rows", + 6L, + "idx", + 5800L + ), + makeRow( + query, + "2011-04-01", + "alias", + "technology", + "rows", + 2L, + "idx", + 156L + ), + makeRow( + query, + "2011-04-01", + "alias", + "travel", + "rows", + 2L, + "idx", + 238L + ), + + makeRow( + query, + "2011-04-02", + "alias", + "automotive", + "rows", + 2L, + "idx", + 294L + ), + makeRow( + query, + "2011-04-02", + "alias", + "business", + "rows", + 2L, + "idx", + 224L + ), + makeRow( + query, + "2011-04-02", + "alias", + "entertainment", + "rows", + 2L, + "idx", + 332L + ), + makeRow( + query, + "2011-04-02", + "alias", + "health", + "rows", + 2L, + "idx", + 226L + ), + makeRow( + query, + "2011-04-02", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4894L + ), + makeRow( + query, + "2011-04-02", + "alias", + "news", + "rows", + 2L, + "idx", + 228L + ), + makeRow( + query, + "2011-04-02", + "alias", + "premium", + "rows", + 6L, + "idx", + 5010L + ), + makeRow( + query, + "2011-04-02", + "alias", + "technology", + "rows", + 2L, + "idx", + 194L + ), + makeRow( + query, + "2011-04-02", + "alias", + "travel", + "rows", + 2L, + "idx", + 252L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "groupBy"); + } + + @Test + public void testGroupByOnMissingColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE), + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION, + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, + null + )) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec("nonexistent0", "alias0"), + new ExtractionDimensionSpec("nonexistent1", "alias1", new StringFormatExtractionFn("foo")) + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List<ResultRow> expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "alias0", null, + "alias1", "foo", + "rows", 52L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "missing-column"); + } + + @Test + public void testGroupByOnUnnestedColumn() + { + // Cannot vectorize due to extraction dimension spec. + cannotVectorize(); + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "alias0") + ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + // Total rows should add up to 26 * 2 = 52 + // 26 rows and each has 2 entries in the column to be unnested + List<ResultRow> expectedResults = Arrays.asList( + makeRow( + query, + "2011-04-01", + "alias0", "a", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "b", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "e", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "h", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "m", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "n", + "rows", 2L + ), + makeRow( + query, + "2011-04-01", + "alias0", "p", + "rows", 6L + ), + makeRow( + query, + "2011-04-01", + "alias0", "preferred", + "rows", 26L + ), + makeRow( + query, + "2011-04-01", + "alias0", "t", + "rows", 4L + ) + ); + + Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "hroupBy-on-unnested-column"); Review Comment: nit ```suggestion TestHelper.assertExpectedObjects(expectedResults, results, "groupBy-on-unnested-column"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
