This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.1 by this push:
new e85d9d9 fixes for inline subqueries when multi-value dimension is
present (#9698) (#9746)
e85d9d9 is described below
commit e85d9d9ba094f93e52a905a64ac5a42284303679
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Apr 24 13:07:15 2020 -0700
fixes for inline subqueries when multi-value dimension is present (#9698)
(#9746)
* fixes for inline subqueries when multi-value dimension is present
* fix test
* allow missing capabilities for vectorized group by queries to be treated
as single dims since it means that column doesnt exist
* add comment
---
.../epinephelinae/GroupByQueryEngineV2.java | 16 ++-
.../epinephelinae/vector/VectorGroupByEngine.java | 2 +-
.../apache/druid/query/topn/TopNQueryEngine.java | 5 +-
.../druid/server/ClientQuerySegmentWalkerTest.java | 131 ++++++++++++++++++++-
4 files changed, 144 insertions(+), 10 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 823f5f2..28399e0 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -227,7 +227,7 @@ public class GroupByQueryEngineV2
processingBuffer,
fudgeTimestamp,
dims,
-
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities,
query.getDimensions()),
+
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities,
query.getDimensions(), false),
cardinalityForArrayAggregation
);
} else {
@@ -238,7 +238,7 @@ public class GroupByQueryEngineV2
processingBuffer,
fudgeTimestamp,
dims,
-
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities,
query.getDimensions())
+
isAllSingleValueDims(columnSelectorFactory::getColumnCapabilities,
query.getDimensions(), false)
);
}
}
@@ -313,12 +313,15 @@ public class GroupByQueryEngineV2
}
/**
- * Checks whether all "dimensions" are either single-valued or nonexistent
(which is just as good as single-valued,
- * since their selectors will show up as full of nulls).
+ * Checks whether all "dimensions" are either single-valued, or if allowed,
nonexistent. Since non-existent column
+ * selectors will show up as full of nulls they are effectively single
valued, however they can also be null during
+ * broker merge, for example with an 'inline' datasource subquery.
'missingMeansNonexistent' is sort of a hack to let
+ * the vectorized engine, which only operates on actual segments, to still
work in this case for non-existent columns.
*/
public static boolean isAllSingleValueDims(
final Function<String, ColumnCapabilities> capabilitiesFunction,
- final List<DimensionSpec> dimensions
+ final List<DimensionSpec> dimensions,
+ final boolean missingMeansNonexistent
)
{
return dimensions
@@ -333,7 +336,8 @@ public class GroupByQueryEngineV2
// Now check column capabilities.
final ColumnCapabilities columnCapabilities =
capabilitiesFunction.apply(dimension.getDimension());
- return columnCapabilities == null ||
!columnCapabilities.hasMultipleValues();
+ return (columnCapabilities != null &&
!columnCapabilities.hasMultipleValues()) ||
+ (missingMeansNonexistent && columnCapabilities == null);
});
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index f8a3a34..3fa8504 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -83,7 +83,7 @@ public class VectorGroupByEngine
// This situation should sort itself out pretty well once this engine
supports multi-valued columns. Then we
// won't have to worry about having this all-single-value-dims check here.
- return
GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities,
query.getDimensions())
+ return
GroupByQueryEngineV2.isAllSingleValueDims(adapter::getColumnCapabilities,
query.getDimensions(), true)
&&
query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
&&
query.getAggregatorSpecs().stream().allMatch(AggregatorFactory::canVectorize)
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index dfe1809..1d1bccd 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -132,9 +132,10 @@ public class TopNQueryEngine
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
- } else if (columnCapabilities != null && !(columnCapabilities.getType() ==
ValueType.STRING
+ } else if (columnCapabilities == null || !(columnCapabilities.getType() ==
ValueType.STRING
&&
columnCapabilities.isDictionaryEncoded())) {
- // Use HeapBasedTopNAlgorithm for non-Strings and for
non-dictionary-encoded Strings.
+ // Use HeapBasedTopNAlgorithm for non-Strings and for
non-dictionary-encoded Strings, and for things we don't know
+ // which can happen for 'inline' data sources when this is run on the
broker
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// Use HeapBasedTopNAlgorithm when the dimension output type is a
non-String. (It's like an extractionFn: there can be
diff --git
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index c98c776..f224b5f 100644
---
a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++
b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -56,7 +56,11 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.ReferenceCountingSegment;
@@ -107,6 +111,7 @@ public class ClientQuerySegmentWalkerTest
private static final String FOO = "foo";
private static final String BAR = "bar";
+ private static final String MULTI = "multi";
private static final Interval INTERVAL = Intervals.of("2000/P1Y");
private static final String VERSION = "A";
@@ -140,6 +145,20 @@ public class ClientQuerySegmentWalkerTest
.build()
);
+ private static final InlineDataSource MULTI_VALUE_INLINE =
InlineDataSource.fromIterable(
+ ImmutableList.<Object[]>builder()
+ .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a",
"b"), 1})
+ .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("a",
"c"), 2})
+ .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("b"),
3})
+ .add(new Object[]{INTERVAL.getStartMillis(), ImmutableList.of("c"),
4})
+ .build(),
+ RowSignature.builder()
+ .addTimeColumn()
+ .add("s", ValueType.STRING)
+ .add("n", ValueType.LONG)
+ .build()
+ );
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -400,6 +419,115 @@ public class ClientQuerySegmentWalkerTest
}
@Test
+ public void testGroupByOnScanMultiValue()
+ {
+ ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
+ .columns("s", "n")
+ .intervals(
+ new
MultipleIntervalSegmentSpec(
+
ImmutableList.of(Intervals.ETERNITY)
+ )
+ )
+ .legacy(false)
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .build();
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource(new QueryDataSource(subquery))
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ONLY_ETERNITY)
+ .setDimensions(DefaultDimensionSpec.of("s"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n",
"n"))
+ .build();
+
+ testQuery(
+ query,
+ // GroupBy handles its own subqueries; only the inner one will go to
the cluster.
+ ImmutableList.of(
+ ExpectedQuery.cluster(subquery),
+ ExpectedQuery.local(
+ query.withDataSource(
+ InlineDataSource.fromIterable(
+ ImmutableList.of(
+ new Object[]{ImmutableList.of("a", "b"), 1},
+ new Object[]{ImmutableList.of("a", "c"), 2},
+ new Object[]{ImmutableList.of("b"), 3},
+ new Object[]{ImmutableList.of("c"), 4}
+ ),
+ RowSignature.builder().add("s", null).add("n",
null).build()
+ )
+ )
+ )
+ ),
+ ImmutableList.of(
+ new Object[]{"a", 3L},
+ new Object[]{"b", 4L},
+ new Object[]{"c", 6L}
+ )
+ );
+
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
+ }
+
+ @Test
+ public void testTopNScanMultiValue()
+ {
+ ScanQuery subquery = new Druids.ScanQueryBuilder().dataSource(MULTI)
+ .columns("s", "n")
+ .intervals(
+ new
MultipleIntervalSegmentSpec(
+
ImmutableList.of(Intervals.ETERNITY)
+ )
+ )
+ .legacy(false)
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .build();
+ final TopNQuery query =
+ new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
+ .granularity(Granularities.ALL)
+ .intervals(Intervals.ONLY_ETERNITY)
+ .dimension(DefaultDimensionSpec.of("s"))
+ .metric("sum_n")
+ .threshold(100)
+ .aggregators(new
LongSumAggregatorFactory("sum_n", "n"))
+ .build();
+
+ testQuery(
+ query,
+ // GroupBy handles its own subqueries; only the inner one will go to
the cluster.
+ ImmutableList.of(
+ ExpectedQuery.cluster(subquery),
+ ExpectedQuery.local(
+ query.withDataSource(
+ InlineDataSource.fromIterable(
+ ImmutableList.of(
+ new Object[]{ImmutableList.of("a", "b"), 1},
+ new Object[]{ImmutableList.of("a", "c"), 2},
+ new Object[]{ImmutableList.of("b"), 3},
+ new Object[]{ImmutableList.of("c"), 4}
+ ),
+ RowSignature.builder().add("s", null).add("n",
null).build()
+ )
+ )
+ )
+ ),
+ ImmutableList.of(
+ new Object[]{Intervals.ETERNITY.getStartMillis(), "c", 6L},
+ new Object[]{Intervals.ETERNITY.getStartMillis(), "b", 4L},
+ new Object[]{Intervals.ETERNITY.getStartMillis(), "a", 3L}
+ )
+ );
+
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
+ }
+
+ @Test
public void testJoinOnTableErrorCantInlineTable()
{
final GroupByQuery query =
@@ -522,7 +650,8 @@ public class ClientQuerySegmentWalkerTest
QueryStackTests.createClusterQuerySegmentWalker(
ImmutableMap.of(
FOO, makeTimeline(FOO, FOO_INLINE),
- BAR, makeTimeline(BAR, BAR_INLINE)
+ BAR, makeTimeline(BAR, BAR_INLINE),
+ MULTI, makeTimeline(MULTI, MULTI_VALUE_INLINE)
),
joinableFactory,
conglomerate,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]