This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.1 by this push:
     new e85d9d9  fixes for inline subqueries when multi-value dimension is 
present (#9698) (#9746)
e85d9d9 is described below

commit e85d9d9ba094f93e52a905a64ac5a42284303679
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Apr 24 13:07:15 2020 -0700

    fixes for inline subqueries when multi-value dimension is present (#9698) 
(#9746)
    
    * fixes for inline subqueries when multi-value dimension is present
    
    * fix test
    
    * allow missing capabilities for vectorized group by queries to be treated 
as single dims since it means that column doesnt exist
    
    * add comment
---
 .../epinephelinae/GroupByQueryEngineV2.java        |  16 ++-
 .../epinephelinae/vector/VectorGroupByEngine.java  |   2 +-
 .../apache/druid/query/topn/TopNQueryEngine.java   |   5 +-
 .../druid/server/ClientQuerySegmentWalkerTest.java | 131 ++++++++++++++++++++-
 4 files changed, 144 insertions(+), 10 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 823f5f2..28399e0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -227,7 +227,7 @@ public class GroupByQueryEngineV2
                       processingBuffer,
                       fudgeTimestamp,
                       dims,
-                      
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, 
query.getDimensions()),
+                      
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, 
query.getDimensions(), false),
                       cardinalityForArrayAggregation
                   );
                 } else {
@@ -238,7 +238,7 @@ public class GroupByQueryEngineV2
                       processingBuffer,
                       fudgeTimestamp,
                       dims,
-                      
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, 
query.getDimensions())
+                      
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities, 
query.getDimensions(), false)
                   );
                 }
               }
@@ -313,12 +313,15 @@ public class GroupByQueryEngineV2
   }
 
   /**
-   * Checks whether all "dimensions" are either single-valued or nonexistent 
(which is just as good as single-valued,
-   * since their selectors will show up as full of nulls).
+   * Checks whether all "dimensions" are either single-valued, or if allowed, 
nonexistent. Since non-existent column
+   * selectors will show up as full of nulls they are effectively single 
valued, however they can also be null during
+   * broker merge, for example with an 'inline' datasource subquery. 
'missingMeansNonexistent' is sort of a hack to let
+   * the vectorized engine, which only operates on actual segments, to still 
work in this case for non-existent columns.
    */
   public static boolean isAllSingleValueDims(
       final Function<String, ColumnCapabilities> capabilitiesFunction,
-      final List<DimensionSpec> dimensions
+      final List<DimensionSpec> dimensions,
+      final boolean missingMeansNonexistent
   )
   {
     return dimensions
@@ -333,7 +336,8 @@ public class GroupByQueryEngineV2
 
               // Now check column capabilities.
               final ColumnCapabilities columnCapabilities = 
capabilitiesFunction.apply(dimension.getDimension());
-              return columnCapabilities == null || 
!columnCapabilities.hasMultipleValues();
+              return (columnCapabilities != null && 
!columnCapabilities.hasMultipleValues()) ||
+                     (missingMeansNonexistent && columnCapabilities == null);
             });
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index f8a3a34..3fa8504 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -83,7 +83,7 @@ public class VectorGroupByEngine
     // This situation should sort itself out pretty well once this engine 
supports multi-valued columns. Then we
     // won't have to worry about having this all-single-value-dims check here.
 
-    return 
GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, 
query.getDimensions())
+    return 
GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities, 
query.getDimensions(), true)
            && 
query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
            && 
query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
            && adapter.canVectorize(filter, query.getVirtualColumns(), false);
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index dfe1809..1d1bccd 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -132,9 +132,10 @@ public class TopNQueryEngine
       topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
     } else if (selector.isHasExtractionFn()) {
       topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
-    } else if (columnCapabilities != null && !(columnCapabilities.getType() == 
ValueType.STRING
+    } else if (columnCapabilities == null || !(columnCapabilities.getType() == 
ValueType.STRING
                                                && 
columnCapabilities.isDictionaryEncoded())) {
-      // Use HeapBasedTopNAlgorithm for non-Strings and for 
non-dictionary-encoded Strings.
+      // Use HeapBasedTopNAlgorithm for non-Strings and for 
non-dictionary-encoded Strings, and for things we don't know
+      // which can happen for 'inline' data sources when this is run on the 
broker
       topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
     } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
       // Use HeapBasedTopNAlgorithm when the dimension output type is a 
non-String. (It's like an extractionFn: there can be
diff --git 
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
 
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index c98c776..f224b5f 100644
--- 
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -56,7 +56,11 @@ import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryHelper;
 import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.segment.InlineSegmentWrangler;
 import org.apache.druid.segment.MapSegmentWrangler;
 import org.apache.druid.segment.ReferenceCountingSegment;
@@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest
 
   private static final String FOO = "foo";
   private static final String BAR = "bar";
+  private static final String MULTI = "multi";
 
   private static final Interval INTERVAL = Intervals.of("2000/P1Y");
   private static final String VERSION = "A";
@@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest
                   .build()
   );
 
+  private static final InlineDataSource MULTI_VALUE_INLINE = 
InlineDataSource.fromIterable(
+      ImmutableList.<Object[]>builder()
+          .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", 
"b"), 1})
+          .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a", 
"c"), 2})
+          .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"), 
3})
+          .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"), 
4})
+          .build(),
+      RowSignature.builder()
+                  .addTimeColumn()
+                  .add("s", ValueType.STRING)
+                  .add("n", ValueType.LONG)
+                  .build()
+  );
+
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
@@ -400,6 +419,115 @@ public class ClientQuerySegmentWalkerTest
   }
 
   @Test
+  public void testGroupByOnScanMultiValue()
+  {
+    ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
+                                                      .columns("s", "n")
+                                                      .intervals(
+                                                          new 
MultipleIntervalSegmentSpec(
+                                                              
ImmutableList.of(Intervals.ETERNITY)
+                                                          )
+                                                      )
+                                                      .legacy(false)
+                                                      
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                                      .build();
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource(new QueryDataSource(subquery))
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ONLY_ETERNITY)
+                    .setDimensions(DefaultDimensionSpec.of("s"))
+                    .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", 
"n"))
+                    .build();
+
+    testQuery(
+        query,
+        // GroupBy handles its own subqueries; only the inner one will go to 
the cluster.
+        ImmutableList.of(
+            ExpectedQuery.cluster(subquery),
+            ExpectedQuery.local(
+                query.withDataSource(
+                    InlineDataSource.fromIterable(
+                        ImmutableList.of(
+                            new Object[]{ImmutableList.of("a", "b"), 1},
+                            new Object[]{ImmutableList.of("a", "c"), 2},
+                            new Object[]{ImmutableList.of("b"), 3},
+                            new Object[]{ImmutableList.of("c"), 4}
+                        ),
+                        RowSignature.builder().add("s", null).add("n", 
null).build()
+                    )
+                )
+            )
+        ),
+        ImmutableList.of(
+            new Object[]{"a", 3L},
+            new Object[]{"b", 4L},
+            new Object[]{"c", 6L}
+        )
+    );
+
+    Assert.assertEquals(2, scheduler.getTotalRun().get());
+    Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(2, scheduler.getTotalReleased().get());
+  }
+
+  @Test
+  public void testTopNScanMultiValue()
+  {
+    ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
+                                                      .columns("s", "n")
+                                                      .intervals(
+                                                          new 
MultipleIntervalSegmentSpec(
+                                                              
ImmutableList.of(Intervals.ETERNITY)
+                                                          )
+                                                      )
+                                                      .legacy(false)
+                                                      
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                                      .build();
+    final TopNQuery query =
+        new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
+                              .granularity(Granularities.ALL)
+                              .intervals(Intervals.ONLY_ETERNITY)
+                              .dimension(DefaultDimensionSpec.of("s"))
+                              .metric("sum_n")
+                              .threshold(100)
+                              .aggregators(new 
LongSumAggregatorFactory("sum_n", "n"))
+                              .build();
+
+    testQuery(
+        query,
+        // GroupBy handles its own subqueries; only the inner one will go to 
the cluster.
+        ImmutableList.of(
+            ExpectedQuery.cluster(subquery),
+            ExpectedQuery.local(
+                query.withDataSource(
+                    InlineDataSource.fromIterable(
+                        ImmutableList.of(
+                            new Object[]{ImmutableList.of("a", "b"), 1},
+                            new Object[]{ImmutableList.of("a", "c"), 2},
+                            new Object[]{ImmutableList.of("b"), 3},
+                            new Object[]{ImmutableList.of("c"), 4}
+                        ),
+                        RowSignature.builder().add("s", null).add("n", 
null).build()
+                    )
+                )
+            )
+        ),
+        ImmutableList.of(
+            new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L},
+            new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L},
+            new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L}
+        )
+    );
+
+    Assert.assertEquals(2, scheduler.getTotalRun().get());
+    Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(2, scheduler.getTotalReleased().get());
+  }
+
+  @Test
   public void testJoinOnTableErrorCantInlineTable()
   {
     final GroupByQuery query =
@@ -522,7 +650,8 @@ public class ClientQuerySegmentWalkerTest
             QueryStackTests.createClusterQuerySegmentWalker(
                 ImmutableMap.of(
                     FOO, makeTimeline(FOO, FOO_INLINE),
-                    BAR, makeTimeline(BAR, BAR_INLINE)
+                    BAR, makeTimeline(BAR, BAR_INLINE),
+                    MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
                 ),
                 joinableFactory,
                 conglomerate,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to