This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 54df2350261 Lazily build Filter in FilteredAggregatorFactory to avoid
parsing exceptions in Router (#15526)
54df2350261 is described below
commit 54df2350261885053f842233686761da6e947aaa
Author: Rishabh Singh <[email protected]>
AuthorDate: Sat Dec 9 12:18:37 2023 +0530
Lazily build Filter in FilteredAggregatorFactory to avoid parsing
exceptions in Router (#15526)
Query with lookups in FilteredAggregator fails with this exception in
router,
Cannot construct instance of
`org.apache.druid.query.aggregation.FilteredAggregatorFactory`, problem: Lookup
[campaigns_lookup[campaignId][is_sold][autodsp]] not found at [Source:
(org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 913] (through
reference chain:
org.apache.druid.query.groupby.GroupByQuery["aggregations"]->java.util.ArrayList[1])
T
he problem is that constructor of FilteredAggregatorFactory is actually
validating if the lookup exists in this statement dimFilter.toFilter().
This is failing on the router, which is to be expected, because, the router
isn’t assigned any lookups.
The fix is to move to a lazy initialisation of the filter object in the
constructor.
---
.../aggregation/FilteredAggregatorFactory.java | 16 ++--
.../server/AsyncQueryForwardingServletTest.java | 87 +++++++++++++++++++++-
2 files changed, 94 insertions(+), 9 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
index 6e4925b62c0..3aa1d855945 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/FilteredAggregatorFactory.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.PerSegmentQueryOptimizationContext;
@@ -49,7 +51,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
{
private final AggregatorFactory delegate;
private final DimFilter dimFilter;
- private final Filter filter;
+ private final Supplier<Filter> filterSupplier;
@Nullable
private final String name;
@@ -75,14 +77,14 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
this.delegate = delegate;
this.dimFilter = dimFilter;
- this.filter = dimFilter.toFilter();
+ this.filterSupplier = Suppliers.memoize(dimFilter::toFilter);
this.name = name;
}
@Override
public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory)
{
- final ValueMatcher valueMatcher =
filter.makeMatcher(columnSelectorFactory);
+ final ValueMatcher valueMatcher =
filterSupplier.get().makeMatcher(columnSelectorFactory);
return new FilteredAggregator(
valueMatcher,
delegate.factorize(columnSelectorFactory)
@@ -92,7 +94,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory
columnSelectorFactory)
{
- final ValueMatcher valueMatcher =
filter.makeMatcher(columnSelectorFactory);
+ final ValueMatcher valueMatcher =
filterSupplier.get().makeMatcher(columnSelectorFactory);
return new FilteredBufferAggregator(
valueMatcher,
delegate.factorizeBuffered(columnSelectorFactory)
@@ -103,7 +105,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
public VectorAggregator factorizeVector(VectorColumnSelectorFactory
columnSelectorFactory)
{
Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot
vectorize");
- final VectorValueMatcher valueMatcher =
filter.makeVectorMatcher(columnSelectorFactory);
+ final VectorValueMatcher valueMatcher =
filterSupplier.get().makeVectorMatcher(columnSelectorFactory);
return new FilteredVectorAggregator(
valueMatcher,
delegate.factorizeVector(columnSelectorFactory)
@@ -113,7 +115,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
- return delegate.canVectorize(columnInspector) &&
filter.canVectorizeMatcher(columnInspector);
+ return delegate.canVectorize(columnInspector) &&
filterSupplier.get().canVectorizeMatcher(columnInspector);
}
@Override
@@ -176,7 +178,7 @@ public class FilteredAggregatorFactory extends
AggregatorFactory
{
return ImmutableList.copyOf(
// use a set to get rid of dupes
-
ImmutableSet.<String>builder().addAll(delegate.requiredFields()).addAll(filter.getRequiredColumns()).build()
+
ImmutableSet.<String>builder().addAll(delegate.requiredFields()).addAll(filterSupplier.get().getRequiredColumns()).build()
);
}
diff --git
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 54238fe8cce..034a0563453 100644
---
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -54,6 +55,13 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryException;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.query.lookup.LookupModule;
+import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.BaseJettyTest;
@@ -105,9 +113,12 @@ import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
@@ -557,6 +568,78 @@ public class AsyncQueryForwardingServletTest extends
BaseJettyTest
verifyServletCallsForQuery(query, true, false, hostFinder, properties,
true);
}
+ @Test
+ public void testNoParseExceptionOnGroupByWithFilteredAggregationOnLookups()
throws Exception
+ {
+ class TestLookupReferenceManager implements
LookupExtractorFactoryContainerProvider
+ {
+ @Override
+ public Set<String> getAllLookupNames()
+ {
+ return null;
+ }
+
+ @Override
+ public Optional<LookupExtractorFactoryContainer> get(String lookupName)
+ {
+ return Optional.empty();
+ }
+ }
+
+ final TimeseriesQuery query =
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("foo")
+ .intervals("2000/P1D")
+ .aggregators(
+ Collections.singletonList(
+ new FilteredAggregatorFactory(
+ new StringAnyAggregatorFactory("stringAny", "col",
1024, true),
+ new SelectorDimFilter(
+ "test",
+ "1",
+ new RegisteredLookupExtractionFn(
+ new TestLookupReferenceManager(),
+ "somelookup",
+ false,
+ null,
+ null,
+ false
+ )
+ ),
+ "agg"
+ )))
+ .granularity(Granularities.ALL)
+ .context(ImmutableMap.of("queryId", "dummy"))
+ .build();
+
+ final QueryHostFinder hostFinder =
EasyMock.createMock(QueryHostFinder.class);
+ EasyMock.expect(hostFinder.pickServer(query)).andReturn(new
TestServer("http", "1.2.3.4", 9999)).once();
+ EasyMock.replay(hostFinder);
+
+ final ObjectMapper jsonMapper =
+ TestHelper.makeJsonMapper()
+ .registerModules(new LookupModule().getJacksonModules())
+ .setInjectableValues(
+ new InjectableValues.Std().addValue(
+ LookupExtractorFactoryContainerProvider.class,
+ new TestLookupReferenceManager()
+ )
+ );
+ verifyServletCallsForQuery(query, false, false, hostFinder, new
Properties(), false, jsonMapper);
+ }
+
+ private void verifyServletCallsForQuery(
+ Object query,
+ boolean isNativeSql,
+ boolean isJDBCSql,
+ QueryHostFinder hostFinder,
+ Properties properties,
+ boolean isFailure
+ ) throws Exception
+ {
+ verifyServletCallsForQuery(query, isNativeSql, isJDBCSql, hostFinder,
properties, isFailure, TestHelper.makeJsonMapper());
+ }
+
/**
* Verifies that the Servlet calls the right methods the right number of
times.
*/
@@ -566,10 +649,10 @@ public class AsyncQueryForwardingServletTest extends
BaseJettyTest
boolean isJDBCSql,
QueryHostFinder hostFinder,
Properties properties,
- boolean isFailure
+ boolean isFailure,
+ ObjectMapper jsonMapper
) throws Exception
{
- final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final ByteArrayInputStream inputStream = new
ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
final ServletInputStream servletInputStream = new ServletInputStream()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]