This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 209f8a95468 Deserialize complex dimensions in group by queries to
their respective types when reading from spilled files and cached results
(#16620)
209f8a95468 is described below
commit 209f8a95468e8450e3d1365d7f0eda98a35732df
Author: Laksh Singla <[email protected]>
AuthorDate: Mon Jul 15 15:00:17 2024 +0530
Deserialize complex dimensions in group by queries to their respective
types when reading from spilled files and cached results (#16620)
Like #16511, but for keys that have been spilled or cached during the
grouping process
---
.../apache/druid/jackson/AggregatorsModule.java | 15 +-
.../org/apache/druid/query/QueryToolChest.java | 21 ++-
.../DataSourceQueryQueryToolChest.java | 8 +
.../query/groupby/GroupByQueryQueryToolChest.java | 106 ++++++++-----
.../druid/query/groupby/epinephelinae/Grouper.java | 11 ++
.../epinephelinae/RowBasedGrouperHelper.java | 135 ++++++++++++++---
.../epinephelinae/RowBasedKeySerdeHelper.java | 5 +
.../groupby/epinephelinae/SpillingGrouper.java | 2 +-
.../SegmentMetadataQueryQueryToolChest.java | 11 ++
.../query/search/SearchQueryQueryToolChest.java | 10 ++
.../TimeBoundaryQueryQueryToolChest.java | 12 ++
.../timeseries/TimeseriesQueryQueryToolChest.java | 12 ++
.../druid/query/topn/TopNQueryQueryToolChest.java | 13 +-
.../column/ObjectStrategyComplexTypeStrategy.java | 6 +-
.../druid/segment/column/TypeStrategies.java | 30 ++++
.../apache/druid/segment/column/TypeStrategy.java | 2 +-
.../query/aggregation/AggregationTestHelper.java | 8 +-
.../groupby/ComplexDimensionGroupByQueryTest.java | 164 +++++++++++++++++++++
.../groupby/GroupByQueryQueryToolChestTest.java | 94 +++++++++---
.../query/groupby/GroupByQueryRunnerTest.java | 5 +-
.../druid/client/CachingClusteredClient.java | 2 +-
.../apache/druid/client/CachingQueryRunner.java | 2 +-
.../druid/query/ResultLevelCachingQueryRunner.java | 2 +-
.../druid/client/CachingQueryRunnerTest.java | 15 +-
24 files changed, 594 insertions(+), 97 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index f7aca511e17..200e6fcb139 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -83,6 +83,16 @@ public class AggregatorsModule extends SimpleModule
{
super("AggregatorFactories");
+ registerComplexMetricsAndSerde();
+
+ setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
+ setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
+
+ addSerializer(DoubleMeanHolder.class,
DoubleMeanHolder.Serializer.INSTANCE);
+ }
+
+ public static void registerComplexMetricsAndSerde()
+ {
ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new
HyperUniquesSerde());
ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new
PreComputedHyperUniquesSerde());
ComplexMetrics.registerSerde(
@@ -102,11 +112,6 @@ public class AggregatorsModule extends SimpleModule
SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
new SerializablePairLongLongComplexMetricSerde()
);
-
- setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
- setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
-
- addSerializer(DoubleMeanHolder.class,
DoubleMeanHolder.Serializer.INSTANCE);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
index b0678f247c9..fa394beec43 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
@@ -251,19 +251,36 @@ public abstract class QueryToolChest<ResultType,
QueryType extends Query<ResultT
*/
public abstract TypeReference<ResultType> getResultTypeReference();
+ /**
+ * Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller
doesn't supply the object mapper for deserializing
+ * and converting the cached data to desired type. It's upto the individual
implementations to decide the appropriate action in that case.
+ * It can either throw an exception outright or decide if the query requires
the object mapper for proper downstream processing and
+ * work with the generic java types if not.
+ * <p>
+ * @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead
+ */
+ @Deprecated
+ @Nullable
+ public <T> CacheStrategy<ResultType, T, QueryType>
getCacheStrategy(QueryType query)
+ {
+ return null;
+ }
+
/**
* Returns a CacheStrategy to be used to load data into the cache and remove
it from the cache.
* <p>
* This is optional. If it returns null, caching is effectively disabled
for the query.
*
* @param query The query whose results might be cached
+ * @param mapper Object mapper to convert the deserialized generic java
objects to desired types. It can be nullable
+ * to preserve backward compatibility.
* @param <T> The type of object that will be stored in the cache
* @return A CacheStrategy that can be used to populate and read from the
Cache
*/
@Nullable
- public <T> CacheStrategy<ResultType, T, QueryType>
getCacheStrategy(QueryType query)
+ public <T> CacheStrategy<ResultType, T, QueryType>
getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper)
{
- return null;
+ return getCacheStrategy(query);
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
index dbe8922f2e9..21fb5c53afc 100644
---
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.datasourcemetadata;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.inject.Inject;
@@ -38,6 +39,7 @@ import
org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.LogicalSegment;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.stream.Collectors;
@@ -119,4 +121,10 @@ public class DataSourceQueryQueryToolChest
{
return null;
}
+
+ @Override
+ public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query,
@Nullable ObjectMapper mapper)
+ {
+ return null;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index b19b479c26d..d69e09c9ff0 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -77,8 +77,10 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.nested.StructuredData;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -471,7 +473,7 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
// Deserializer that can deserialize either array- or map-based rows.
final JsonDeserializer<ResultRow> deserializer = new
JsonDeserializer<ResultRow>()
{
- final Class<?>[] dimensionClasses = createDimensionClasses();
+ final Class<?>[] dimensionClasses = createDimensionClasses(query);
boolean containsComplexDimensions = query.getDimensions()
.stream()
.anyMatch(
@@ -524,30 +526,6 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
return ResultRow.of(objectArray);
}
}
-
- private Class<?>[] createDimensionClasses()
- {
- final List<DimensionSpec> queryDimensions = query.getDimensions();
- final Class<?>[] classes = new Class[queryDimensions.size()];
- for (int i = 0; i < queryDimensions.size(); ++i) {
- final ColumnType dimensionOutputType =
queryDimensions.get(i).getOutputType();
- if (dimensionOutputType.is(ValueType.COMPLEX)) {
- NullableTypeStrategy nullableTypeStrategy =
dimensionOutputType.getNullableStrategy();
- if (!nullableTypeStrategy.groupable()) {
- throw DruidException.defensive(
- "Ungroupable dimension [%s] with type [%s] found in the
query.",
- queryDimensions.get(i).getDimension(),
- dimensionOutputType
- );
- }
- classes[i] = nullableTypeStrategy.getClazz();
- } else {
- classes[i] = Object.class;
- }
- }
- return classes;
- }
-
};
class GroupByResultRowModule extends SimpleModule
@@ -597,9 +575,32 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
);
}
+ @Nullable
@Override
- public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final
GroupByQuery query)
+ public CacheStrategy<ResultRow, Object, GroupByQuery>
getCacheStrategy(GroupByQuery query)
{
+ return getCacheStrategy(query, null);
+ }
+
+ @Override
+ public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
+ final GroupByQuery query,
+ @Nullable final ObjectMapper mapper
+ )
+ {
+
+ for (DimensionSpec dimension : query.getDimensions()) {
+ if (dimension.getOutputType().is(ValueType.COMPLEX) &&
!dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
+ if (mapper == null) {
+ throw DruidException.defensive(
+ "Cannot deserialize complex dimension of type[%s] from result
cache if object mapper is not provided",
+ dimension.getOutputType().getComplexTypeName()
+ );
+ }
+ }
+ }
+ final Class<?>[] dimensionClasses = createDimensionClasses(query);
+
return new CacheStrategy<ResultRow, Object, GroupByQuery>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
@@ -726,13 +727,29 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
int dimPos = 0;
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec dimensionSpec = dimsIter.next();
-
- // Must convert generic Jackson-deserialized type into the
proper type.
- resultRow.set(
- dimensionStart + dimPos,
- DimensionHandlerUtils.convertObjectToType(results.next(),
dimensionSpec.getOutputType())
- );
-
+ final Object dimensionObject = results.next();
+ final Object dimensionObjectCasted;
+
+ final ColumnType outputType = dimensionSpec.getOutputType();
+
+ // Must convert generic Jackson-deserialized type into the
proper type. The downstream functions expect the
+ // dimensions to be of appropriate types for further processing
like merging and comparing.
+ if (outputType.is(ValueType.COMPLEX)) {
+ // Json columns can interpret generic data objects
appropriately, hence they are wrapped as is in StructuredData.
+ // They don't need to converted them from Object.class to
StructuredData.class using object mapper as that is an
+ // expensive operation that will be wasteful.
+ if (outputType.equals(ColumnType.NESTED_DATA)) {
+ dimensionObjectCasted = StructuredData.wrap(dimensionObject);
+ } else {
+ dimensionObjectCasted = mapper.convertValue(dimensionObject,
dimensionClasses[dimPos]);
+ }
+ } else {
+ dimensionObjectCasted =
DimensionHandlerUtils.convertObjectToType(
+ dimensionObject,
+ dimensionSpec.getOutputType()
+ );
+ }
+ resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
dimPos++;
}
@@ -861,4 +878,27 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
return retVal;
}
+
+ private static Class<?>[] createDimensionClasses(final GroupByQuery query)
+ {
+ final List<DimensionSpec> queryDimensions = query.getDimensions();
+ final Class<?>[] classes = new Class[queryDimensions.size()];
+ for (int i = 0; i < queryDimensions.size(); ++i) {
+ final ColumnType dimensionOutputType =
queryDimensions.get(i).getOutputType();
+ if (dimensionOutputType.is(ValueType.COMPLEX)) {
+ NullableTypeStrategy nullableTypeStrategy =
dimensionOutputType.getNullableStrategy();
+ if (!nullableTypeStrategy.groupable()) {
+ throw DruidException.defensive(
+ "Ungroupable dimension [%s] with type [%s] found in the query.",
+ queryDimensions.get(i).getDimension(),
+ dimensionOutputType
+ );
+ }
+ classes[i] = nullableTypeStrategy.getClazz();
+ } else {
+ classes[i] = Object.class;
+ }
+ }
+ return classes;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
index 591624f1ab8..0f3faedb707 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.groupby.epinephelinae;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -232,6 +233,16 @@ public interface Grouper<KeyType> extends Closeable
*/
BufferComparator bufferComparatorWithAggregators(AggregatorFactory[]
aggregatorFactories, int[] aggregatorOffsets);
+ /**
+ * Decorates the object mapper enabling it to read and write query
results' grouping keys. It is used by the
+ * {@link SpillingGrouper} to preserve the types of the dimensions after
serializing and deserializing them on the
+ * spilled files.
+ */
+ default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
+ {
+ return spillMapper;
+ }
+
/**
* Reset the keySerde to its initial state. After this method is called,
{@link #readFromByteBuffer}
* and {@link #bufferComparator()} may no longer work properly on
previously-serialized keys.
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 491c28d4142..da8a0e04623 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -19,9 +19,14 @@
package org.apache.druid.query.groupby.epinephelinae;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.ObjectCodec;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
@@ -84,6 +89,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -666,22 +672,6 @@ public class RowBasedGrouperHelper
this.key = key;
}
- @JsonCreator
- public static RowBasedKey fromJsonArray(final Object[] key)
- {
- // Type info is lost during serde:
- // Floats may be deserialized as doubles, Longs may be deserialized as
integers, convert them back
- for (int i = 0; i < key.length; i++) {
- if (key[i] instanceof Integer) {
- key[i] = ((Integer) key[i]).longValue();
- } else if (key[i] instanceof Double) {
- key[i] = ((Double) key[i]).floatValue();
- }
- }
-
- return new RowBasedKey(key);
- }
-
@JsonValue
public Object[] getKey()
{
@@ -1371,6 +1361,65 @@ public class RowBasedGrouperHelper
);
}
+ @Override
+ public ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
+ {
+
+ final JsonDeserializer<RowBasedKey> deserializer = new
JsonDeserializer<RowBasedKey>()
+ {
+ @Override
+ public RowBasedKey deserialize(
+ JsonParser jp,
+ DeserializationContext deserializationContext
+ ) throws IOException
+ {
+ if (!jp.isExpectedStartArrayToken()) {
+ throw DruidException.defensive("Expected array start token,
received [%s]", jp.getCurrentToken());
+ }
+ jp.nextToken();
+
+ final ObjectCodec codec = jp.getCodec();
+ final int timestampAdjustment = includeTimestamp ? 1 : 0;
+ final int dimsToRead = timestampAdjustment + serdeHelpers.length;
+ int dimsReadSoFar = 0;
+ final Object[] objects = new Object[dimsToRead];
+
+ if (includeTimestamp) {
+ DruidException.conditionalDefensive(
+ jp.currentToken() != JsonToken.END_ARRAY,
+ "Unexpected end of array when deserializing timestamp from the
spilled files"
+ );
+ objects[dimsReadSoFar] = codec.readValue(jp, Long.class);
+
+ ++dimsReadSoFar;
+ jp.nextToken();
+ }
+
+ while (jp.currentToken() != JsonToken.END_ARRAY) {
+ objects[dimsReadSoFar] =
+ codec.readValue(jp, serdeHelpers[dimsReadSoFar -
timestampAdjustment].getClazz());
+
+ ++dimsReadSoFar;
+ jp.nextToken();
+ }
+
+ return new RowBasedKey(objects);
+ }
+ };
+
+ class SpillModule extends SimpleModule
+ {
+ public SpillModule()
+ {
+ addDeserializer(RowBasedKey.class, deserializer);
+ }
+ }
+
+ final ObjectMapper newObjectMapper = spillMapper.copy();
+ newObjectMapper.registerModule(new SpillModule());
+ return newObjectMapper;
+ }
+
@Override
public void reset()
{
@@ -1588,6 +1637,7 @@ public class RowBasedGrouperHelper
{
final BufferComparator bufferComparator;
final String columnTypeName;
+ final Class<?> clazz;
final List<Object> dictionary;
final Object2IntMap<Object> reverseDictionary;
@@ -1613,6 +1663,7 @@ public class RowBasedGrouperHelper
dictionary.get(lhsBuffer.getInt(lhsPosition +
keyBufferPosition)),
dictionary.get(rhsBuffer.getInt(rhsPosition +
keyBufferPosition))
);
+ clazz = columnType.getNullableStrategy().getClazz();
}
// Asserts that we don't entertain any complex types without a typename,
to prevent intermixing dictionaries of
@@ -1645,6 +1696,12 @@ public class RowBasedGrouperHelper
{
return reverseDictionary;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return clazz;
+ }
}
@@ -1726,6 +1783,14 @@ public class RowBasedGrouperHelper
{
return reverseDictionary;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ // Jackson deserializes Object[] containing longs to Object[]
containing string if Object[].class is returned
+ // Therefore we are using Object.class
+ return Object.class;
+ }
}
private class ArrayStringRowBasedKeySerdeHelper extends
DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
@@ -1770,6 +1835,12 @@ public class RowBasedGrouperHelper
{
return reverseStringArrayDictionary;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Object[].class;
+ }
}
private abstract class AbstractStringRowBasedKeySerdeHelper implements
RowBasedKeySerdeHelper
@@ -1819,6 +1890,12 @@ public class RowBasedGrouperHelper
{
return bufferComparator;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return String.class;
+ }
}
private class DynamicDictionaryStringRowBasedKeySerdeHelper extends
AbstractStringRowBasedKeySerdeHelper
@@ -1937,6 +2014,12 @@ public class RowBasedGrouperHelper
{
return bufferComparator;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Long.class;
+ }
}
private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
@@ -1982,6 +2065,12 @@ public class RowBasedGrouperHelper
{
return bufferComparator;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Float.class;
+ }
}
private class DoubleRowBasedKeySerdeHelper implements
RowBasedKeySerdeHelper
@@ -2027,6 +2116,12 @@ public class RowBasedGrouperHelper
{
return bufferComparator;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Double.class;
+ }
}
// This class is only used when SQL compatible null handling is enabled.
@@ -2082,6 +2177,12 @@ public class RowBasedGrouperHelper
{
return comparator;
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return delegate.getClazz();
+ }
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
index 1cb29d23bc0..71372ca238b 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
@@ -65,4 +65,9 @@ interface RowBasedKeySerdeHelper
* Return a {@link BufferComparator} to compare keys stored in ByteBuffer.
*/
BufferComparator getBufferComparator();
+
+ /**
+ * Returns the expected class of the key which used to deserialize the
objects correctly from the spilled files.
+ */
+ Class<?> getClazz();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 4e9b96102a1..d8a7760c11d 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -152,7 +152,7 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
}
this.aggregatorFactories = aggregatorFactories;
this.temporaryStorage = temporaryStorage;
- this.spillMapper = spillMapper;
+ this.spillMapper = keySerde.decorateObjectMapper(spillMapper);
this.spillingAllowed = spillingAllowed;
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
}
diff --git
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index 912ecb1ac32..fd8d7e7009c 100644
---
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@@ -62,6 +63,7 @@ import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -184,6 +186,15 @@ public class SegmentMetadataQueryQueryToolChest extends
QueryToolChest<SegmentAn
@Override
public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>
getCacheStrategy(final SegmentMetadataQuery query)
+ {
+ return getCacheStrategy(query, null);
+ }
+
+ @Override
+ public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>
getCacheStrategy(
+ final SegmentMetadataQuery query,
+ @Nullable final ObjectMapper objectMapper
+ )
{
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis,
SegmentMetadataQuery>()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
index b390cd83a58..c15e1d0d99c 100644
---
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.search;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@@ -124,6 +125,15 @@ public class SearchQueryQueryToolChest extends
QueryToolChest<Result<SearchResul
@Override
public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>
getCacheStrategy(final SearchQuery query)
+ {
+ return getCacheStrategy(query, null);
+ }
+
+ @Override
+ public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>
getCacheStrategy(
+ final SearchQuery query,
+ @Nullable final ObjectMapper objectMapper
+ )
{
return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
diff --git
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 9087dd26a88..eab5e0f5abc 100644
---
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.timeboundary;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@@ -47,6 +48,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
@@ -163,6 +165,16 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public CacheStrategy<Result<TimeBoundaryResultValue>, Object,
TimeBoundaryQuery> getCacheStrategy(final TimeBoundaryQuery query)
+ {
+ return getCacheStrategy(query, null);
+ }
+
+
+ @Override
+ public CacheStrategy<Result<TimeBoundaryResultValue>, Object,
TimeBoundaryQuery> getCacheStrategy(
+ final TimeBoundaryQuery query,
+ @Nullable final ObjectMapper objectMapper
+ )
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object,
TimeBoundaryQuery>()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 17a2f8be956..67c36fe7603 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
@@ -276,6 +278,16 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
@Override
public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>
getCacheStrategy(final TimeseriesQuery query)
+ {
+ return getCacheStrategy(query, null);
+ }
+
+
+ @Override
+ public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>
getCacheStrategy(
+ final TimeseriesQuery query,
+ @Nullable final ObjectMapper objectMapper
+ )
{
return new CacheStrategy<Result<TimeseriesResultValue>, Object,
TimeseriesQuery>()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 25a4284aa42..21bc336438a 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.topn;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -64,6 +65,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
@@ -268,9 +270,18 @@ public class TopNQueryQueryToolChest extends
QueryToolChest<Result<TopNResultVal
return TYPE_REFERENCE;
}
+ @Nullable
+ @Override
+ public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>
getCacheStrategy(TopNQuery query)
+ {
+ return getCacheStrategy(query, null);
+ }
@Override
- public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>
getCacheStrategy(final TopNQuery query)
+ public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>
getCacheStrategy(
+ final TopNQuery query,
+ @Nullable final ObjectMapper objectMapper
+ )
{
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
index b274e55282e..f80a1cdcf8d 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
@@ -123,7 +123,7 @@ public class ObjectStrategyComplexTypeStrategy<T>
implements TypeStrategy<T>
public int hashCode(T o)
{
if (hashStrategy == null) {
- throw DruidException.defensive("hashStrategy not provided");
+ throw DruidException.defensive("Type [%s] is not groupable",
typeSignature.asTypeString());
}
return hashStrategy.hashCode(o);
}
@@ -132,7 +132,7 @@ public class ObjectStrategyComplexTypeStrategy<T>
implements TypeStrategy<T>
public boolean equals(T a, T b)
{
if (hashStrategy == null) {
- throw DruidException.defensive("hashStrategy not provided");
+ throw DruidException.defensive("Type [%s] is not groupable",
typeSignature.asTypeString());
}
return hashStrategy.equals(a, b);
}
@@ -141,7 +141,7 @@ public class ObjectStrategyComplexTypeStrategy<T>
implements TypeStrategy<T>
public Class<?> getClazz()
{
if (clazz == null) {
- throw DruidException.defensive("hashStrategy not provided");
+ throw DruidException.defensive("Type [%s] is not groupable",
typeSignature.asTypeString());
}
return clazz;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
index bae29179b4d..7ac8def99ec 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
@@ -299,6 +299,12 @@ public class TypeStrategies
{
return a.equals(b);
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Long.class;
+ }
}
/**
@@ -368,6 +374,12 @@ public class TypeStrategies
{
return a.equals(b);
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Float.class;
+ }
}
/**
@@ -438,6 +450,12 @@ public class TypeStrategies
{
return a.equals(b);
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Double.class;
+ }
}
/**
@@ -519,6 +537,12 @@ public class TypeStrategies
{
return a.equals(b);
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return String.class;
+ }
}
/**
@@ -664,5 +688,11 @@ public class TypeStrategies
return false;
}
}
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return Object[].class;
+ }
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
index c5cff1a0b2f..075fceca473 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
@@ -225,6 +225,6 @@ public interface TypeStrategy<T> extends
Comparator<Object>, Hash.Strategy<T>
*/
default Class<?> getClazz()
{
- throw DruidException.defensive("Not implemented. It is only implemented
for complex dimensions which are groupable()");
+ throw DruidException.defensive("Not implemented. Check groupable() first");
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 526a62c813f..2ad9f90148a 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -766,7 +766,7 @@ public class AggregationTestHelper implements Closeable
String resultStr = mapper.writer().writeValueAsString(yielder);
List<ResultRow> resultRows = Lists.transform(
- readQueryResultArrayFromString(resultStr),
+ readQueryResultArrayFromString(resultStr, queryPlus.getQuery()),
toolChest.makePreComputeManipulatorFn(
queryPlus.getQuery(),
MetricManipulatorFns.deserializing()
@@ -798,11 +798,13 @@ public class AggregationTestHelper implements Closeable
};
}
- private List readQueryResultArrayFromString(String str) throws Exception
+ private List readQueryResultArrayFromString(String str, Query query) throws
Exception
{
List result = new ArrayList();
- JsonParser jp = mapper.getFactory().createParser(str);
+ ObjectMapper decoratedMapper = toolChest.decorateObjectMapper(mapper,
query);
+
+ JsonParser jp = decoratedMapper.getFactory().createParser(str);
if (jp.nextToken() != JsonToken.START_ARRAY) {
throw new IAE("not an array [%s]", str);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
new file mode 100644
index 00000000000..bc1ecbb0ddc
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import
org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class ComplexDimensionGroupByQueryTest
+{
+ private final QueryContexts.Vectorize vectorize;
+ private final AggregationTestHelper helper;
+ private final List<Segment> segments;
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public ComplexDimensionGroupByQueryTest(GroupByQueryConfig config, String
vectorize)
+ {
+ this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
+ this.helper =
AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ Collections.emptyList(),
+ config,
+ tempFolder
+ );
+ Sequence<Object[]> rows = Sequences.simple(
+ ImmutableList.of(
+ new Object[]{new SerializablePairLongString(1L, "abc")},
+ new Object[]{new SerializablePairLongString(1L, "abc")},
+ new Object[]{new SerializablePairLongString(1L, "def")},
+ new Object[]{new SerializablePairLongString(1L, "abc")},
+ new Object[]{new SerializablePairLongString(1L, "ghi")},
+ new Object[]{new SerializablePairLongString(1L, "def")},
+ new Object[]{new SerializablePairLongString(1L, "abc")},
+ new Object[]{new SerializablePairLongString(1L, "pqr")},
+ new Object[]{new SerializablePairLongString(1L, "xyz")},
+ new Object[]{new SerializablePairLongString(1L, "foo")},
+ new Object[]{new SerializablePairLongString(1L, "bar")}
+ )
+ );
+ RowSignature rowSignature = RowSignature.builder()
+ .add(
+ "pair",
+
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+ )
+ .build();
+
+ this.segments = Collections.singletonList(
+ new RowBasedSegment<>(
+ SegmentId.dummy("dummy"),
+ rows,
+ columnName -> {
+ final int columnNumber = rowSignature.indexOf(columnName);
+ return row -> columnNumber >= 0 ? row[columnNumber] : null;
+ },
+ rowSignature
+ )
+ );
+ }
+
+ @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
+ public static Collection<?> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+ for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
+ for (String vectorize : new String[]{"false", "force"}) {
+ constructors.add(new Object[]{config, vectorize});
+ }
+ }
+ return constructors;
+ }
+
+ public Map<String, Object> getContext()
+ {
+ return ImmutableMap.of(
+ QueryContexts.VECTORIZE_KEY, vectorize.toString(),
+ QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "true"
+ );
+ }
+
+ @Test
+ public void testGroupByOnPairClass()
+ {
+ GroupByQuery groupQuery = GroupByQuery.builder()
+ .setDataSource("test_datasource")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setDimensions(new
DefaultDimensionSpec(
+ "pair",
+ "pair",
+
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+ ))
+ .setAggregatorSpecs(new
CountAggregatorFactory("count"))
+ .setContext(getContext())
+ .build();
+
+ if (vectorize == QueryContexts.Vectorize.FORCE) {
+ // Cannot vectorize group by on complex dimension
+ Assert.assertThrows(
+ RuntimeException.class,
+ () -> helper.runQueryOnSegmentsObjs(segments, groupQuery).toList()
+ );
+ } else {
+ List<ResultRow> resultRows = helper.runQueryOnSegmentsObjs(segments,
groupQuery).toList();
+
+ Assert.assertArrayEquals(
+ new ResultRow[]{
+ ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L),
+ ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L),
+ ResultRow.of(new SerializablePairLongString(1L, "def"), 2L),
+ ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L),
+ ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L),
+ ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L),
+ ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L)
+ },
+ resultRows.toArray()
+ );
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
index f43bbce9d97..7279ca938bd 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.collections.SerializablePair;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
+import org.apache.druid.jackson.AggregatorsModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -97,6 +98,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
public static void setUpClass()
{
NullHandling.initializeForTests();
+ AggregatorsModule.registerComplexMetricsAndSerde();
}
@Test
@@ -130,11 +132,13 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -190,11 +194,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -252,11 +257,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setHavingSpec(new
GreaterThanHavingSpec(QueryRunnerTestHelper.UNIQUE_METRIC, 10))
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -336,11 +342,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setHavingSpec(andHavingSpec2)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -427,11 +434,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setHavingSpec(havingSpec2)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -490,11 +498,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
))
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -512,6 +521,48 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
doTestCacheStrategy(ColumnType.LONG, 2L);
}
+ @Test
+ public void testComplexDimensionCacheStrategy() throws IOException
+ {
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(ImmutableList.of(
+ new DefaultDimensionSpec(
+ "test",
+ "test",
+
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+ )
+ ))
+ .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
+ .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+ .build();
+
+ ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+
+ CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
objectMapper);
+
+ // test timestamps that result in integer size millis
+ final ResultRow result1 = ResultRow.of(
+ 123L,
+ new SerializablePairLongString(123L, "abc"),
+ 1
+ );
+
+ Object preparedValue =
strategy.prepareForSegmentLevelCache().apply(result1);
+
+ Object fromCacheValue = objectMapper.readValue(
+ objectMapper.writeValueAsBytes(preparedValue),
+ strategy.getCacheObjectClazz()
+ );
+
+ ResultRow fromCacheResult =
strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
+
+ Assert.assertEquals(result1, fromCacheResult);
+ }
+
@Test
public void testMultiColumnCacheStrategy() throws Exception
{
@@ -538,8 +589,9 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
// test timestamps that result in integer size millis
final ResultRow result1 = ResultRow.of(
@@ -1054,8 +1106,9 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
// test timestamps that result in integer size millis
final ResultRow result1 = ResultRow.of(
@@ -1147,11 +1200,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertFalse(Arrays.equals(
@@ -1183,11 +1237,12 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN,
"false"))
.build();
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1,
mapper);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+ new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2,
mapper);
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
Assert.assertTrue(
@@ -1245,7 +1300,8 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final GroupByQueryQueryToolChest queryToolChest = new
GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
- CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy =
queryToolChest.getCacheStrategy(query);
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
+ CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy =
queryToolChest.getCacheStrategy(query, mapper);
Assert.assertTrue(
"result level cache on broker server for GroupByStrategyV2 should be
enabled",
cacheStrategy.isCacheable(query, false, false)
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index d4dc8734130..a5dbb49bca5 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.Rows;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
@@ -9965,7 +9966,6 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
@Test
public void testGroupByComplexColumn()
{
- cannotVectorize();
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
@@ -9979,7 +9979,8 @@ public class GroupByQueryRunnerTest extends
InitializedNullHandlingTest
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();
- expectedException.expect(RuntimeException.class);
+ expectedException.expect(DruidException.class);
+ expectedException.expectMessage("Type [COMPLEX<hyperUnique>] is not
groupable");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 7bcb4c2ce03..5fa34d6699d 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -275,7 +275,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
this.responseContext = responseContext;
this.query = queryPlus.getQuery();
this.toolChest = warehouse.getToolChest(query);
- this.strategy = toolChest.getCacheStrategy(query);
+ this.strategy = toolChest.getCacheStrategy(query, objectMapper);
this.dataSourceAnalysis = query.getDataSource().getAnalysis();
this.useCache = CacheUtil.isUseSegmentCache(query, strategy,
cacheConfig, CacheUtil.ServerType.BROKER);
diff --git
a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
index 9bb9f474dd9..41d4bb4ea63 100644
--- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
@@ -86,7 +86,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext
responseContext)
{
Query<T> query = queryPlus.getQuery();
- final CacheStrategy strategy = toolChest.getCacheStrategy(query);
+ final CacheStrategy strategy = toolChest.getCacheStrategy(query, mapper);
final boolean populateCache = canPopulateCache(query, strategy);
final boolean useCache = canUseCache(query, strategy);
diff --git
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 182faba7a09..0af6ebca3ed 100644
---
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -73,7 +73,7 @@ public class ResultLevelCachingQueryRunner<T> implements
QueryRunner<T>
this.cache = cache;
this.cacheConfig = cacheConfig;
this.query = query;
- this.strategy = queryToolChest.getCacheStrategy(query);
+ this.strategy = queryToolChest.getCacheStrategy(query, objectMapper);
this.populateResultCache = CacheUtil.isPopulateResultCache(
query,
strategy,
diff --git
a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
index a4375a61900..7208ab2fc4b 100644
--- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
@@ -68,6 +68,7 @@ import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
@@ -90,7 +91,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@RunWith(Parameterized.class)
-public class CachingQueryRunnerTest
+public class CachingQueryRunnerTest extends InitializedNullHandlingTest
{
@Parameterized.Parameters(name = "numBackgroundThreads={0}")
public static Iterable<Object[]> constructorFeeder()
@@ -222,8 +223,8 @@ public class CachingQueryRunnerTest
Cache cache = EasyMock.mock(Cache.class);
EasyMock.replay(cache);
CachingQueryRunner queryRunner = makeCachingQueryRunner(null, cache,
toolchest, Sequences.empty());
- Assert.assertFalse(queryRunner.canPopulateCache(query,
toolchest.getCacheStrategy(query)));
- Assert.assertFalse(queryRunner.canUseCache(query,
toolchest.getCacheStrategy(query)));
+ Assert.assertFalse(queryRunner.canPopulateCache(query,
toolchest.getCacheStrategy(query, null)));
+ Assert.assertFalse(queryRunner.canUseCache(query,
toolchest.getCacheStrategy(query, null)));
queryRunner.run(QueryPlus.wrap(query));
EasyMock.verifyUnexpectedCalls(cache);
}
@@ -243,7 +244,7 @@ public class CachingQueryRunnerTest
QueryToolChest toolchest = EasyMock.mock(QueryToolChest.class);
Cache cache = EasyMock.mock(Cache.class);
- EasyMock.expect(toolchest.getCacheStrategy(query)).andReturn(null);
+ EasyMock.expect(toolchest.getCacheStrategy(EasyMock.eq(query),
EasyMock.anyObject())).andReturn(null);
EasyMock.replay(cache, toolchest);
CachingQueryRunner queryRunner = makeCachingQueryRunner(new byte[0],
cache, toolchest, Sequences.empty());
Assert.assertFalse(queryRunner.canPopulateCache(query, null));
@@ -339,7 +340,7 @@ public class CachingQueryRunnerTest
resultSeq
);
- CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
+ CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
CACHE_ID,
SEGMENT_DESCRIPTOR,
@@ -383,7 +384,7 @@ public class CachingQueryRunnerTest
byte[] cacheKeyPrefix = RandomUtils.nextBytes(10);
- CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
+ CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
CACHE_ID,
SEGMENT_DESCRIPTOR,
@@ -399,7 +400,7 @@ public class CachingQueryRunnerTest
toolchest,
Sequences.empty()
);
- Assert.assertTrue(runner.canUseCache(query,
toolchest.getCacheStrategy(query)));
+ Assert.assertTrue(runner.canUseCache(query,
toolchest.getCacheStrategy(query, null)));
List<Result> results = runner.run(QueryPlus.wrap(query)).toList();
Assert.assertEquals(expectedResults.toString(), results.toString());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]