imply-cheddar commented on code in PR #13554:
URL: https://github.com/apache/druid/pull/13554#discussion_r1066588418


##########
processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java:
##########
@@ -467,6 +478,87 @@ public String toString()
     };
   }
 
+  public static <T, QueryType extends Query<T>> List<QueryRunner<T>> 
makeUnnestQueryRunners(
+      QueryRunnerFactory<T, QueryType> factory,
+      String dimensionToUnnest,
+      String outputColumn,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
+    final IncrementalIndex noRollupRtIndex = 
TestIndex.getNoRollupIncrementalTestIndex();
+    final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
+    final QueryableIndex noRollupMMappedTestIndex = 
TestIndex.getNoRollupMMappedTestIndex();
+    final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
+    final QueryableIndex frontCodedMappedTestIndex = 
TestIndex.getFrontCodedMMappedTestIndex();

Review Comment:
   I'm confused as to why you have to do this?  The Unnest is defined as part 
of the query, not as part of the query runner, you should be able to just send 
the query along and it will attach the unnest on the segment via the segment 
mapper functions, no?



##########
processing/src/test/java/org/apache/druid/query/UnnestSegment.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.UnnestStorageAdapter;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+
+
+/**
+ * The segment for the Unnest Data Source.
+ * The input column name, output name and the allowSet follow from {@link 
UnnestDataSource}
+ */
+
+public class UnnestSegment implements Segment
+{
+  private static final Logger log = new Logger(UnnestSegment.class);
+
+  private final Segment baseSegment;
+  private final String dimension;
+  private final String renamedOutputDimension;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestSegment(Segment baseSegment, String dimension, String 
outputName, LinkedHashSet<String> allowList)
+  {
+    this.baseSegment = baseSegment;
+    this.dimension = dimension;
+    this.renamedOutputDimension = outputName;
+    this.allowSet = allowList;
+  }

Review Comment:
   I don't believe this should actually be required.  The segment mapping fns 
should be doing this as part of query processing.



##########
processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java:
##########
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+
+@RunWith(Parameterized.class)
+public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest
+{
+  public static final QuerySegmentSpec I_0112_0114 = 
ScanQueryRunnerTest.I_0112_0114;
+  private static final VirtualColumn EXPR_COLUMN =
+      new ExpressionVirtualColumn("expr", "index * 2", ColumnType.LONG, 
TestExprMacroTable.INSTANCE);
+  private static final ScanQueryQueryToolChest TOOL_CHEST = new 
ScanQueryQueryToolChest(
+      new ScanQueryConfig(),
+      DefaultGenericQueryMetricsFactory.instance()
+  );
+  private static final ScanQueryRunnerFactory FACTORY = new 
ScanQueryRunnerFactory(
+      TOOL_CHEST,
+      new ScanQueryEngine(),
+      new ScanQueryConfig()
+  );
+  private final QueryRunner runner;
+  private final boolean legacy;
+
+  public UnnestScanQueryRunnerTest(final QueryRunner runner, final boolean 
legacy)
+  {
+    this.runner = runner;
+    this.legacy = legacy;
+  }
+
+  @Parameterized.Parameters(name = "{0}, legacy = {1}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+
+    return QueryRunnerTestHelper.cartesian(
+        QueryRunnerTestHelper.makeUnnestQueryRunners(
+            FACTORY,
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+            null
+        ),
+        ImmutableList.of(false, true)
+    );
+  }
+
+  private Druids.ScanQueryBuilder newTestUnnestQuery()
+  {
+    return Druids.newScanQueryBuilder()
+                 .dataSource(QueryRunnerTestHelper.UNNEST_DATA_SOURCE)
+                 .columns(Collections.emptyList())
+                 .eternityInterval()
+                 .limit(3)
+                 .legacy(legacy);
+  }
+
+  private Druids.ScanQueryBuilder newTestUnnestQueryWithAllowSet()
+  {
+    List<String> allowList = Arrays.asList("a", "b", "c");
+    LinkedHashSet allowSet = new LinkedHashSet(allowList);
+    return Druids.newScanQueryBuilder()
+                 .dataSource(UnnestDataSource.create(
+                     new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
+                     QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
+                     QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+                     allowSet
+                 ))
+                 .columns(Collections.emptyList())
+                 .eternityInterval()
+                 .limit(3)
+                 .legacy(legacy);
+  }
+
+  @Test
+  public void testScanOnUnnest()
+  {
+    ScanQuery query = newTestUnnestQuery()
+        .intervals(I_0112_0114)
+        .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        .limit(3)
+        .build();
+
+    Iterable<ScanResultValue> results = 
runner.run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    if (legacy) {
+      values = new String[]{
+          "2011-01-12T00:00:00.000Z\ta",
+          "2011-01-12T00:00:00.000Z\tpreferred",
+          "2011-01-12T00:00:00.000Z\tb"
+      };
+    } else {
+      values = new String[]{
+          "a",
+          "preferred",
+          "b"
+      };
+    }
+
+    final List<List<Map<String, Object>>> events = toEvents(columnNames, 
values);
+    List<ScanResultValue> expectedResults = toExpected(
+        events,
+        legacy
+        ? Lists.newArrayList(getTimestampName(), 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        : 
Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
+        0,
+        3
+    );
+    ScanQueryRunnerTest.verify(expectedResults, results);
+  }
+
+  @Test
+  public void testUnnestRunnerVirtualColumnsUsingSingleColumn()
+  {
+    ScanQuery query =
+        Druids.newScanQueryBuilder()
+              .intervals(I_0112_0114)
+              .dataSource(UnnestDataSource.create(
+                  new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
+                  "vc",
+                  QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+                  null
+              ))
+              .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+              .eternityInterval()
+              .legacy(legacy)
+              .virtualColumns(
+                  new ExpressionVirtualColumn(
+                      "vc",
+                      "mv_to_array(placementish)",
+                      ColumnType.STRING,
+                      TestExprMacroTable.INSTANCE
+                  )
+              )
+              .limit(3)
+              .build();
+
+    final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
+    QueryRunner vcrunner = 
QueryRunnerTestHelper.makeUnnestQueryRunner(FACTORY, new 
IncrementalIndexSegment(rtIndex, QueryRunnerTestHelper.SEGMENT_ID), "vc", 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, null, "rtIndexvc");
+    Iterable<ScanResultValue> results = 
vcrunner.run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    if (legacy) {
+      values = new String[]{
+          "2011-01-12T00:00:00.000Z\ta",
+          "2011-01-12T00:00:00.000Z\tpreferred",
+          "2011-01-12T00:00:00.000Z\tb"
+      };
+    } else {
+      values = new String[]{
+          "a",
+          "preferred",
+          "b"
+      };
+    }
+
+    final List<List<Map<String, Object>>> events = toEvents(columnNames, 
values);
+    List<ScanResultValue> expectedResults = toExpected(
+        events,
+        legacy
+        ? Lists.newArrayList(getTimestampName(), 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        : 
Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
+        0,
+        3
+    );
+    ScanQueryRunnerTest.verify(expectedResults, results);
+  }
+
+  @Test
+  public void testUnnestRunnerVirtualColumnsUsingMultipleColumn()
+  {
+    ScanQuery query =
+        Druids.newScanQueryBuilder()
+              .intervals(I_0112_0114)
+              .dataSource(UnnestDataSource.create(
+                  new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
+                  "vc",
+                  QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+                  null
+              ))
+              .columns(QueryRunnerTestHelper.MARKET_DIMENSION, 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+              .eternityInterval()
+              .legacy(legacy)
+              .virtualColumns(
+                  new ExpressionVirtualColumn(
+                      "vc",
+                      "array(\"market\",\"quality\")",
+                      ColumnType.STRING_ARRAY,
+                      TestExprMacroTable.INSTANCE
+                  )
+              )
+              .limit(4)
+              .build();
+
+    final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
+    QueryRunner vcrunner = 
QueryRunnerTestHelper.makeUnnestQueryRunner(FACTORY, new 
IncrementalIndexSegment(rtIndex, QueryRunnerTestHelper.SEGMENT_ID), "vc", 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, null, "rtIndexvc");
+    Iterable<ScanResultValue> results = 
vcrunner.run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.MARKET_DIMENSION,
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          QueryRunnerTestHelper.MARKET_DIMENSION,
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    if (legacy) {
+      values = new String[]{
+          "2011-01-12T00:00:00.000Z\tspot\tspot",
+          "2011-01-12T00:00:00.000Z\tspot\tautomotive",
+          "2011-01-12T00:00:00.000Z\tspot\tspot",
+          "2011-01-12T00:00:00.000Z\tspot\tbusiness",
+      };
+    } else {
+      values = new String[]{
+          "spot\tspot",
+          "spot\tautomotive",
+          "spot\tspot",
+          "spot\tbusiness"
+      };
+    }
+
+    final List<List<Map<String, Object>>> events = toEvents(columnNames, 
values);
+    List<ScanResultValue> expectedResults = toExpected(
+        events,
+        legacy
+        ? Lists.newArrayList(getTimestampName(), 
QueryRunnerTestHelper.MARKET_DIMENSION, 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        : Lists.newArrayList(QueryRunnerTestHelper.MARKET_DIMENSION, 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
+        0,
+        4
+    );
+    ScanQueryRunnerTest.verify(expectedResults, results);
+  }
+
+  @Test
+  public void testUnnestRunnerWithFilter()
+  {
+    ScanQuery query = newTestUnnestQuery()
+        .intervals(I_0112_0114)
+        .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        .limit(3)
+        .filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, 
"spot", null))
+        .build();
+
+    Iterable<ScanResultValue> results = 
runner.run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    if (legacy) {
+      values = new String[]{
+          "2011-01-12T00:00:00.000Z\ta",
+          "2011-01-12T00:00:00.000Z\tpreferred",
+          "2011-01-12T00:00:00.000Z\tb"
+      };
+    } else {
+      values = new String[]{
+          "a",
+          "preferred",
+          "b"
+      };
+    }
+
+    final List<List<Map<String, Object>>> events = toEvents(columnNames, 
values);
+    List<ScanResultValue> expectedResults = toExpected(
+        events,
+        legacy
+        ? Lists.newArrayList(getTimestampName(), 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        : 
Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
+        0,
+        3
+    );
+    ScanQueryRunnerTest.verify(expectedResults, results);
+  }
+
+  @Test
+  public void testUnnestRunnerWithOrdering()
+  {
+    ScanQuery query = newTestUnnestQuery()
+        .intervals(I_0112_0114)
+        .columns(QueryRunnerTestHelper.TIME_DIMENSION, 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        .limit(3)
+        .filters(new SelectorDimFilter(QueryRunnerTestHelper.MARKET_DIMENSION, 
"spot", null))
+        .order(ScanQuery.Order.ASCENDING)
+        .build();
+
+
+    Iterable<ScanResultValue> results = 
runner.run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          ColumnHolder.TIME_COLUMN_NAME,
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    values = new String[]{
+        "2011-01-12T00:00:00.000Z\ta",
+        "2011-01-12T00:00:00.000Z\tpreferred",
+        "2011-01-12T00:00:00.000Z\tb"
+    };
+
+    final List<List<Map<String, Object>>> ascendingEvents = 
toEvents(columnNames, values);
+    if (legacy) {
+      for (List<Map<String, Object>> batch : ascendingEvents) {
+        for (Map<String, Object> event : batch) {
+          event.put("__time", ((DateTime) event.get("timestamp")).getMillis());
+        }
+      }
+    } else {
+      for (List<Map<String, Object>> batch : ascendingEvents) {
+        for (Map<String, Object> event : batch) {
+          event.put("__time", (DateTimes.of((String) 
event.get("__time"))).getMillis());
+        }
+      }
+    }
+    List<ScanResultValue> ascendingExpectedResults = toExpected(
+        ascendingEvents,
+        legacy ?
+        Lists.newArrayList(
+            QueryRunnerTestHelper.TIME_DIMENSION,
+            getTimestampName(),
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+        ) :
+        Lists.newArrayList(
+            QueryRunnerTestHelper.TIME_DIMENSION,
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+        ),
+        0,
+        3
+    );
+
+    ScanQueryRunnerTest.verify(ascendingExpectedResults, results);
+  }
+
+  @Test
+  public void testUnnestRunnerNonNullAllowSet()
+  {
+    List<String> allowList = Arrays.asList("a", "b", "c");
+    LinkedHashSet allowSet = new LinkedHashSet(allowList);
+    ScanQuery query = newTestUnnestQueryWithAllowSet()
+        .intervals(I_0112_0114)
+        .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        .limit(3)
+        .build();
+
+    List<QueryRunner<ScanResultValue>> unrunner = 
QueryRunnerTestHelper.makeUnnestQueryRunners(
+        FACTORY,
+        QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
+        QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+        allowSet
+    );
+
+    Iterable<ScanResultValue> results = 
unrunner.get(1).run(QueryPlus.wrap(query)).toList();
+    String[] columnNames;
+    if (legacy) {
+      columnNames = new String[]{
+          getTimestampName() + ":TIME",
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    } else {
+      columnNames = new String[]{
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST
+      };
+    }
+    String[] values;
+    if (legacy) {
+      values = new String[]{
+          "2011-01-12T00:00:00.000Z\ta",
+          "2011-01-12T00:00:00.000Z\tb",
+          "2011-01-13T00:00:00.000Z\ta"
+      };
+    } else {
+      values = new String[]{
+          "a",
+          "b",
+          "a"
+      };
+    }
+
+    final List<List<Map<String, Object>>> events = toEvents(columnNames, 
values);
+    List<ScanResultValue> expectedResults = toExpected(
+        events,
+        legacy
+        ? Lists.newArrayList(getTimestampName(), 
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
+        : 
Collections.singletonList(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST),
+        0,
+        3
+    );
+    ScanQueryRunnerTest.verify(expectedResults, results);
+  }
+
+
+  private List<List<Map<String, Object>>> toEvents(final String[] dimSpecs, 
final String[]... valueSet)
+  {
+    List<String> values = new ArrayList<>();
+    for (String[] vSet : valueSet) {
+      values.addAll(Arrays.asList(vSet));
+    }
+    List<List<Map<String, Object>>> events = new ArrayList<>();
+    events.add(
+        Lists.newArrayList(
+            Iterables.transform(
+                values,
+                input -> {
+                  Map<String, Object> event = new HashMap<>();
+                  String[] values1 = input.split("\\t");
+                  for (int i = 0; i < dimSpecs.length; i++) {
+                    if (dimSpecs[i] == null || i >= dimSpecs.length) {

Review Comment:
   this CodeQL thing is likely legitimate



##########
processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java:
##########
@@ -0,0 +1,833 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.ExtractionDimensionSpec;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.extraction.StringFormatExtractionFn;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
+import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
+import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
+@RunWith(Parameterized.class)
+public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
+{
+  private static TestGroupByBuffers BUFFER_POOLS = null;
+
+  private final QueryRunner<ResultRow> runner;
+  private final QueryRunner<ResultRow> originalRunner;
+  private final GroupByQueryRunnerFactory factory;
+  private final GroupByQueryConfig config;
+  private final boolean vectorize;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  public UnnestGroupByQueryRunnerTest(
+      String testName,
+      GroupByQueryConfig config,
+      GroupByQueryRunnerFactory factory,
+      QueryRunner runner,
+      boolean vectorize
+  )
+  {
+    this.config = config;
+    this.factory = factory;
+    this.runner = factory.mergeRunners(Execs.directExecutor(), 
ImmutableList.of(runner));
+    this.originalRunner = runner;
+    String runnerName = runner.toString();
+    this.vectorize = vectorize;
+  }
+
+  public static List<GroupByQueryConfig> testConfigs()
+  {
+
+    final GroupByQueryConfig v2Config = new GroupByQueryConfig()
+    {
+      @Override
+      public String getDefaultStrategy()
+      {
+        return GroupByStrategySelector.STRATEGY_V2;
+      }
+
+      @Override
+      public int getBufferGrouperInitialBuckets()
+      {
+        // Small initial table to force some growing.
+        return 4;
+      }
+
+      @Override
+      public String toString()
+      {
+        return "v2";
+      }
+    };
+    final GroupByQueryConfig v2SmallBufferConfig = new GroupByQueryConfig()
+    {
+      @Override
+      public String getDefaultStrategy()
+      {
+        return GroupByStrategySelector.STRATEGY_V2;
+      }
+
+      @Override
+      public int getBufferGrouperMaxSize()
+      {
+        return 2;
+      }
+
+      @Override
+      public HumanReadableBytes getMaxOnDiskStorage()
+      {
+        return HumanReadableBytes.valueOf(10L * 1024 * 1024);
+      }
+
+      @Override
+      public String toString()
+      {
+        return "v2SmallBuffer";
+      }
+    };
+    final GroupByQueryConfig v2SmallDictionaryConfig = new GroupByQueryConfig()
+    {
+      @Override
+      public String getDefaultStrategy()
+      {
+        return GroupByStrategySelector.STRATEGY_V2;
+      }
+
+      @Override
+      public HumanReadableBytes getMaxOnDiskStorage()
+      {
+        return HumanReadableBytes.valueOf(10L * 1024 * 1024);
+      }
+
+      @Override
+      public String toString()
+      {
+        return "v2SmallDictionary";
+      }
+    };
+    final GroupByQueryConfig v2ParallelCombineConfig = new GroupByQueryConfig()
+    {
+      @Override
+      public String getDefaultStrategy()
+      {
+        return GroupByStrategySelector.STRATEGY_V2;
+      }
+
+      @Override
+      public int getNumParallelCombineThreads()
+      {
+        return 
GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG.getNumThreads();
+      }
+
+      @Override
+      public String toString()
+      {
+        return "v2ParallelCombine";
+      }
+    };
+
+
+    return ImmutableList.of(
+        v2Config
+    );
+  }
+
+  public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+      final GroupByQueryConfig config,
+      final TestGroupByBuffers bufferPools
+  )
+  {
+    return makeQueryRunnerFactory(GroupByQueryRunnerTest.DEFAULT_MAPPER, 
config, bufferPools, GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG);
+  }
+
+  public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+      final ObjectMapper mapper,
+      final GroupByQueryConfig config,
+      final TestGroupByBuffers bufferPools
+  )
+  {
+    return makeQueryRunnerFactory(mapper, config, bufferPools, 
GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG);
+  }
+
+  public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+      final ObjectMapper mapper,
+      final GroupByQueryConfig config,
+      final TestGroupByBuffers bufferPools,
+      final DruidProcessingConfig processingConfig
+  )
+  {
+    if (bufferPools.getBufferSize() != 
processingConfig.intermediateComputeSizeBytes()) {
+      throw new ISE(
+          "Provided buffer size [%,d] does not match configured size [%,d]",
+          bufferPools.getBufferSize(),
+          processingConfig.intermediateComputeSizeBytes()
+      );
+    }
+    if (bufferPools.getNumMergeBuffers() != 
processingConfig.getNumMergeBuffers()) {
+      throw new ISE(
+          "Provided merge buffer count [%,d] does not match configured count 
[%,d]",
+          bufferPools.getNumMergeBuffers(),
+          processingConfig.getNumMergeBuffers()
+      );
+    }
+    final Supplier<GroupByQueryConfig> configSupplier = 
Suppliers.ofInstance(config);
+    final GroupByStrategySelector strategySelector = new 
GroupByStrategySelector(
+        configSupplier,
+        new GroupByStrategyV1(
+            configSupplier,
+            new GroupByQueryEngine(configSupplier, 
bufferPools.getProcessingPool()),
+            QueryRunnerTestHelper.NOOP_QUERYWATCHER
+        ),
+        new GroupByStrategyV2(
+            processingConfig,
+            configSupplier,
+            bufferPools.getProcessingPool(),
+            bufferPools.getMergePool(),
+            TestHelper.makeJsonMapper(),
+            mapper,
+            QueryRunnerTestHelper.NOOP_QUERYWATCHER
+        )
+    );
+    final GroupByQueryQueryToolChest toolChest = new 
GroupByQueryQueryToolChest(strategySelector);
+    return new GroupByQueryRunnerFactory(strategySelector, toolChest);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> constructorFeeder()
+  {
+    NullHandling.initializeForTests();
+    setUpClass();
+
+    final List<Object[]> constructors = new ArrayList<>();
+    for (GroupByQueryConfig config : testConfigs()) {
+      final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, 
BUFFER_POOLS);
+      for (QueryRunner<ResultRow> runner : 
QueryRunnerTestHelper.makeUnnestQueryRunners(
+          factory,
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
+          QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+          null
+      )) {
+        for (boolean vectorize : ImmutableList.of(false)) {
+          final String testName = StringUtils.format("config=%s, runner=%s, 
vectorize=%s", config, runner, vectorize);
+
+          // Add vectorization tests for any indexes that support it.
+          if (!vectorize ||
+              (QueryRunnerTestHelper.isTestRunnerVectorizable(runner) &&
+               
config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) {
+            constructors.add(new Object[]{testName, config, factory, runner, 
vectorize});
+          }
+        }
+      }
+    }
+
+    return constructors;
+  }
+
+  @BeforeClass
+  public static void setUpClass()
+  {
+    if (BUFFER_POOLS == null) {
+      BUFFER_POOLS = TestGroupByBuffers.createDefault();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownClass()
+  {
+    BUFFER_POOLS.close();
+    BUFFER_POOLS = null;
+  }
+
+  private static ResultRow makeRow(final GroupByQuery query, final String 
timestamp, final Object... vals)
+  {
+    return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, 
vals);
+  }
+
+  private static ResultRow makeRow(final GroupByQuery query, final DateTime 
timestamp, final Object... vals)
+  {
+    return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, 
vals);
+  }
+
+  private static List<ResultRow> makeRows(
+      final GroupByQuery query,
+      final String[] columnNames,
+      final Object[]... values
+  )
+  {
+    return GroupByQueryRunnerTestHelper.createExpectedRows(query, columnNames, 
values);
+  }
+
+  @Test
+  public void testGroupBy()
+  {
+    GroupByQuery query = makeQueryBuilder()
+        .setDataSource(UnnestDataSource.create(
+            new TableDataSource(QueryRunnerTestHelper.DATA_SOURCE),
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION,
+            QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST,
+            null
+        ))
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(
+            QueryRunnerTestHelper.ROWS_COUNT,
+            new LongSumAggregatorFactory("idx", "index")
+        )
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .build();
+
+    List<ResultRow> expectedResults = Arrays.asList(
+        makeRow(
+            query,
+            "2011-04-01",
+            "alias",
+            "automotive",
+            "rows",
+            2L,
+            "idx",
+            270L
+        ),

Review Comment:
   style nit, I don't think these are actually wide enough to require each of 
the arguments being on independent lines.  The test would read a lot easier if 
each of the `makeRow()` calls was its own line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to