This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 8d123695b226ea8702e0230657f4defeb0020f51 Author: Surekha <[email protected]> AuthorDate: Thu Apr 18 13:31:29 2019 -0700 Fix result-level cache for queries (#7325) * Add SegmentDescriptor interval in the hash while calculating Etag * Add computeResultLevelCacheKey to CacheStrategy Make HavingSpec cacheable and implement getCacheKey for subclasses Add unit tests for computeResultLevelCacheKey * Add more tests * Use CacheKeyBuilder for HavingSpec's getCacheKey * Initialize aggregators map to avoid NPE * adjust cachekey builder for HavingSpec to ignore aggregators * unused import * PR comments --- .../java/org/apache/druid/query/CacheStrategy.java | 10 + .../query/groupby/GroupByQueryQueryToolChest.java | 22 + .../query/groupby/having/AlwaysHavingSpec.java | 7 + .../druid/query/groupby/having/AndHavingSpec.java | 8 + .../query/groupby/having/DimFilterHavingSpec.java | 10 + .../having/DimensionSelectorHavingSpec.java | 11 + .../query/groupby/having/EqualToHavingSpec.java | 11 + .../groupby/having/GreaterThanHavingSpec.java | 11 + .../druid/query/groupby/having/HavingSpec.java | 3 +- .../{NeverHavingSpec.java => HavingSpecUtil.java} | 23 +- .../query/groupby/having/LessThanHavingSpec.java | 11 + .../query/groupby/having/NeverHavingSpec.java | 7 + .../druid/query/groupby/having/NotHavingSpec.java | 9 + .../druid/query/groupby/having/OrHavingSpec.java | 9 + .../SegmentMetadataQueryQueryToolChest.java | 12 + .../query/search/SearchQueryQueryToolChest.java | 6 + .../query/select/SelectQueryQueryToolChest.java | 6 + .../TimeBoundaryQueryQueryToolChest.java | 6 + .../timeseries/TimeseriesQueryQueryToolChest.java | 16 + .../druid/query/topn/TopNQueryQueryToolChest.java | 15 + .../groupby/GroupByQueryQueryToolChestTest.java | 486 +++++++++++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 12 + .../druid/query/groupby/having/HavingSpecTest.java | 11 + .../TimeseriesQueryQueryToolChestTest.java | 162 +++++++ .../query/topn/TopNQueryQueryToolChestTest.java | 87 ++++ .../druid/client/CachingClusteredClient.java | 4 + .../druid/query/ResultLevelCachingQueryRunner.java | 2 +- .../druid/client/CachingClusteredClientTest.java | 51 ++- 28 files changed, 1014 insertions(+), 14 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index 7a300c0..8b106a6 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -52,6 +52,16 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>> byte[] computeCacheKey(QueryType query); /** + * Computes the result level cache key for the given query. + * Some implementations may include query parameters that might not be used in {@code computeCacheKey} for same query + * + * @param query the query to be cached + * + * @return the result level cache key + */ + byte[] computeResultLevelCacheKey(QueryType query); + + /** * Returns the class type of what is used in the cache * * @return Returns the class type of what is used in the cache diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index a49b48a..c94d427 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -482,6 +482,28 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery } @Override + public byte[] computeResultLevelCacheKey(GroupByQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY) + .appendByte(CACHE_STRATEGY_VERSION) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheables(query.getDimensions()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheable(query.getHavingSpec()) + .appendCacheable(query.getLimitSpec()) + .appendCacheables(query.getPostAggregatorSpecs()); + + if (query.getSubtotalsSpec() != null && !query.getSubtotalsSpec().isEmpty()) { + for (List<String> subTotalSpec : query.getSubtotalsSpec()) { + builder.appendStrings(subTotalSpec); + } + } + return builder.build(); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java index 1ac4f31..8450589 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to true @@ -31,4 +32,10 @@ public class AlwaysHavingSpec extends BaseHavingSpec { return true; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_ALWAYS).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java index 55ca3ee..f035db3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.List; @@ -110,4 +111,11 @@ public class AndHavingSpec extends BaseHavingSpec sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_AND) + .appendCacheables(havingSpecs).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java index 00155ba..cf916bf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.transform.RowFunction; @@ -170,6 +171,15 @@ public class DimFilterHavingSpec extends BaseHavingSpec return new TransformSpec(filter, transforms).toTransformer(rowSignature); } + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER) + .appendCacheable(dimFilter) + .appendByte((byte) (isFinalize() ? 1 : 0)) + .build(); + } + private static class RowAsInputRow implements InputRow { private final Row row; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java index 9004116..4dfc6e6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.query.extraction.IdentityExtractionFn; @@ -117,4 +118,14 @@ public class DimensionSelectorHavingSpec extends BaseHavingSpec ", extractionFn=" + extractionFn + '}'; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR) + .appendString(dimension) + .appendString(value) + .appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey()) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java index ac8d64e..00c471b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java @@ -22,7 +22,9 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -123,4 +125,13 @@ public class EqualToHavingSpec extends BaseHavingSpec sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_EQUAL) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java index 842dbcd..aa276a9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -22,7 +22,9 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -119,4 +121,13 @@ public class GreaterThanHavingSpec extends BaseHavingSpec sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java index 19ad071..e75641f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.Cacheable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.column.ValueType; @@ -44,7 +45,7 @@ import java.util.Map; @JsonSubTypes.Type(name = "always", value = AlwaysHavingSpec.class), @JsonSubTypes.Type(name = "filter", value = DimFilterHavingSpec.class) }) -public interface HavingSpec +public interface HavingSpec extends Cacheable { // Atoms for easy combination, but for now they are mostly useful // for testing. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java similarity index 60% copy from processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java copy to processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java index 55ab48b..e1227e7 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java @@ -19,16 +19,17 @@ package org.apache.druid.query.groupby.having; -import org.apache.druid.data.input.Row; - -/** - * A "having" spec that always evaluates to false - */ -public class NeverHavingSpec extends BaseHavingSpec +public class HavingSpecUtil { - @Override - public boolean eval(Row row) - { - return false; - } + static final byte CACHE_TYPE_ID_ALWAYS = 0x0; + static final byte CACHE_TYPE_ID_AND = 0x1; + static final byte CACHE_TYPE_ID_DIM_SELECTOR = 0x2; + static final byte CACHE_TYPE_ID_DIM_FILTER = 0x3; + static final byte CACHE_TYPE_ID_EQUAL = 0x4; + static final byte CACHE_TYPE_ID_GREATER_THAN = 0x5; + static final byte CACHE_TYPE_ID_LESS_THAN = 0x6; + static final byte CACHE_TYPE_ID_NEVER = 0x7; + static final byte CACHE_TYPE_ID_NOT = 0x8; + static final byte CACHE_TYPE_ID_OR = 0x9; + static final byte CACHE_TYPE_ID_COUNTING = 0xA; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java index 99917cb..3c937cf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java @@ -21,7 +21,9 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import java.util.Map; @@ -117,4 +119,13 @@ public class LessThanHavingSpec extends BaseHavingSpec sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN) + .appendString(aggregationName) + .appendByteArray(StringUtils.toUtf8(String.valueOf(value))) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java index 55ab48b..fa2c15d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.having; import org.apache.druid.data.input.Row; +import org.apache.druid.query.cache.CacheKeyBuilder; /** * A "having" spec that always evaluates to false @@ -31,4 +32,10 @@ public class NeverHavingSpec extends BaseHavingSpec { return false; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NEVER).build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java index 315d059..81d7a63 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.Map; @@ -98,4 +99,12 @@ public class NotHavingSpec extends BaseHavingSpec { return havingSpec != null ? havingSpec.hashCode() : 0; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NOT) + .appendCacheable(havingSpec) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java index 7425b90..e648349 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.Row; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.column.ValueType; import java.util.List; @@ -110,4 +111,12 @@ public class OrHavingSpec extends BaseHavingSpec sb.append('}'); return sb.toString(); } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_OR) + .appendCacheables(havingSpecs) + .build(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 4dc50f5..2813000 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -50,6 +50,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -73,6 +74,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn { }; private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; + private static final byte SEGMENT_METADATA_QUERY = 0x16; private static final Function<SegmentAnalysis, SegmentAnalysis> MERGE_TRANSFORM_FN = new Function<SegmentAnalysis, SegmentAnalysis>() { @Override @@ -195,6 +197,16 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn } @Override + public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query) + { + // need to include query "merge" and "lenientAggregatorMerge" for result level cache key + return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query)) + .appendBoolean(query.isMerge()) + .appendBoolean(query.isLenientAggregatorMerge()) + .build(); + } + + @Override public TypeReference<SegmentAnalysis> getCacheObjectClazz() { return getResultTypeReference(); diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java index 61f77e3..8f35b25 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java @@ -202,6 +202,12 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul } @Override + public byte[] computeResultLevelCacheKey(SearchQuery query) + { + return computeCacheKey(query); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java index d551266..45ebb95 100644 --- a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java @@ -237,6 +237,12 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul } @Override + public byte[] computeResultLevelCacheKey(SelectQuery query) + { + return computeCacheKey(query); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 3519037..9fc0e88 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -155,6 +155,12 @@ public class TimeBoundaryQueryQueryToolChest } @Override + public byte[] computeResultLevelCacheKey(TimeBoundaryQuery query) + { + return computeCacheKey(query); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 479fbf0..f8f5aa0 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -275,6 +275,22 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser } @Override + public byte[] computeResultLevelCacheKey(TimeseriesQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TIMESERIES_QUERY) + .appendBoolean(query.isDescending()) + .appendBoolean(query.isSkipEmptyBuckets()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()) + .appendInt(query.getLimit()) + .appendBoolean(query.isGrandTotal()); + return builder.build(); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 4774c5c..2c3bd2b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -327,6 +327,21 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal } @Override + public byte[] computeResultLevelCacheKey(TopNQuery query) + { + final CacheKeyBuilder builder = new CacheKeyBuilder(TOPN_QUERY) + .appendCacheable(query.getDimensionSpec()) + .appendCacheable(query.getTopNMetricSpec()) + .appendInt(query.getThreshold()) + .appendCacheable(query.getGranularity()) + .appendCacheable(query.getDimensionsFilter()) + .appendCacheables(query.getAggregatorSpecs()) + .appendCacheable(query.getVirtualColumns()) + .appendCacheables(query.getPostAggregatorSpecs()); + return builder.build(); + } + + @Override public TypeReference<Object> getCacheObjectClazz() { return OBJECT_TYPE_REFERENCE; diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java new file mode 100644 index 0000000..2bad8f8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.Row; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.AndDimFilter; +import org.apache.druid.query.filter.BoundDimFilter; +import org.apache.druid.query.filter.OrDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.having.AndHavingSpec; +import org.apache.druid.query.groupby.having.DimFilterHavingSpec; +import org.apache.druid.query.groupby.having.EqualToHavingSpec; +import org.apache.druid.query.groupby.having.GreaterThanHavingSpec; +import org.apache.druid.query.groupby.having.HavingSpec; +import org.apache.druid.query.groupby.having.LessThanHavingSpec; +import org.apache.druid.query.groupby.having.NotHavingSpec; +import org.apache.druid.query.groupby.having.OrHavingSpec; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparators; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class GroupByQueryQueryToolChestTest +{ + + @Test + public void testResultLevelCacheKeyWithPostAggregate() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithLimitSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias - 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 8)) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(new GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 10)) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithAndHavingSpec() + { + final List<HavingSpec> havings = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(1.3)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2))) + ) + ) + ); + final HavingSpec andHavingSpec = new AndHavingSpec(havings); + + final List<HavingSpec> havings2 = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(13.0)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(22))) + ) + ) + ); + final HavingSpec andHavingSpec2 = new AndHavingSpec(havings2); + + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(andHavingSpec2) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec() + { + final DimFilterHavingSpec havingSpec1 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "217", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + + final DimFilterHavingSpec havingSpec2 = new DimFilterHavingSpec( + new AndDimFilter( + ImmutableList.of( + new OrDimFilter( + ImmutableList.of( + new BoundDimFilter("rows", "2", null, true, false, null, null, StringComparators.NUMERIC), + new SelectorDimFilter("idx", "317", null) + ) + ), + new SelectorDimFilter("__time", String.valueOf(DateTimes.of("2011-04-01").getMillis()), null) + ) + ), + null + ); + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec1) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "alias + 'x'", null, TestExprMacroTable.INSTANCE) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("post", OrderByColumnSpec.Direction.DESCENDING) + ), + Integer.MAX_VALUE + ) + ) + .setHavingSpec(havingSpec2) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + + @Test + public void testResultLevelCacheKeyWithSubTotalsSpec() + { + final GroupByQuery query1 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of("market"), + ImmutableList.of() + )) + .build(); + + final GroupByQuery query2 = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", "market") + )) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index"), + new FloatSumAggregatorFactory("idxFloat", "indexFloat"), + new DoubleSumAggregatorFactory("idxDouble", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("alias"), + ImmutableList.of() + )) + .build(); + + final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query1); + + final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( + null, + QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator() + ).getCacheStrategy(query2); + + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 96e3699..cc25be4 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -5369,6 +5369,12 @@ public class GroupByQueryRunnerTest new BaseHavingSpec() { @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + + @Override public boolean eval(Row row) { return (row.getMetric("idx_subpostagg").floatValue() < 3800); @@ -5630,6 +5636,12 @@ public class GroupByQueryRunnerTest new BaseHavingSpec() { @Override + public byte[] getCacheKey() + { + return new byte[0]; + } + + @Override public boolean eval(Row row) { return (row.getMetric("idx_subpostagg").floatValue() < 3800); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java index ec44b7f..a0e1d74 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.cache.CacheKeyBuilder; import org.junit.Test; import java.util.ArrayList; @@ -233,6 +235,15 @@ public class HavingSpecTest counter.incrementAndGet(); return value; } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_COUNTING) + .appendByte((byte) (value ? 1 : 0)) + .appendByteArray(StringUtils.toUtf8(String.valueOf(counter))) + .build(); + } } @Test diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index 0742e1e..6d07c59 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -32,7 +32,9 @@ import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; +import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumns; @@ -158,4 +160,164 @@ public class TimeseriesQueryQueryToolChestTest ) ); } + + @Test + public void testResultLevelCacheKey() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } + + @Test + public void testResultLevelCacheKeyWithGrandTotal() + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2015-01-01/2015-01-02") + .descending(descending) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("metric0", "metric0"), + new CountAggregatorFactory("metric1") + ) + ) + .postAggregators( + ImmutableList.of( + new ArithmeticPostAggregator( + "post", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + null, + "metric1" + ), + new FieldAccessPostAggregator( + null, + "metric0" + ) + ) + ) + ) + ) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + Assert.assertTrue( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2) + ) + ); + Assert.assertFalse( + Arrays.equals( + TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1), + TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2) + ) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index e263c1e..cede671 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.Result; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQueryRunners; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -128,6 +129,92 @@ public class TopNQueryQueryToolChestTest ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); + } + + @Test + public void testComputeResultLevelCacheKeyWithDifferentPostAgg() throws IOException + { + final TopNQuery query1 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post1", + "/", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final TopNQuery query2 = new TopNQuery( + new TableDataSource("dummy"), + VirtualColumns.EMPTY, + new DefaultDimensionSpec("test", "test"), + new LegacyTopNMetricSpec("metric1"), + 3, + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))), + null, + Granularities.ALL, + ImmutableList.of( + new LongSumAggregatorFactory("metric1", "metric1"), + new LongSumAggregatorFactory("metric2", "metric2") + ), + ImmutableList.of( + new ArithmeticPostAggregator( + "post2", + "+", + ImmutableList.of( + new FieldAccessPostAggregator( + "metric1", + "metric1" + ), + new FieldAccessPostAggregator( + "metric2", + "metric2" + ) + ) + ) + ), + null + ); + + final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy1 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query1); + + final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy2 = new TopNQueryQueryToolChest( + null, + null + ).getCacheStrategy(query2); + + //segment level cache key excludes postaggregates in topn + Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); + Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); + Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2))); } @Test diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 253aa0f..37791ee 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -385,6 +385,10 @@ public class CachingClusteredClient implements QuerySegmentWalker break; } hasher.putString(p.getServer().getSegment().getId().toString(), StandardCharsets.UTF_8); + // it is important to add the "query interval" as part ETag calculation + // to have result level cache work correctly for queries with different + // intervals covering the same set of segments + hasher.putString(p.rhs.getInterval().toString(), StandardCharsets.UTF_8); } if (hasOnlyHistoricalSegments) { diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 3efd946..6a303b8 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -80,7 +80,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T> { if (useResultCache || populateResultCache) { - final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeCacheKey(query)); + final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query)); final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr); String existingResultSetId = extractEtagFromResults(cachedResultSet); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index e1b7a88..c2aa3ec 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -3130,7 +3130,56 @@ public class CachingClusteredClientTest Map<String, Object> responseContext = new HashMap<>(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=", responseContext.get("ETag")); + Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get("ETag")); + } + + @Test + public void testEtagforDifferentQueryInterval() + { + final Interval interval = Intervals.of("2016-01-01/2016-01-02"); + final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00"); + final Interval queryInterval2 = Intervals.of("2016-01-01T18:00:00/2016-01-02T18:00:00"); + final DataSegment dataSegment = new DataSegment( + "dataSource", + interval, + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum"), + NoneShardSpec.instance(), + 9, + 12334 + ); + final ServerSelector selector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); + timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); + + final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2))) + .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .build(); + + + final Map<String, Object> responseContext = new HashMap<>(); + + getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); + final Object etag1 = responseContext.get("ETag"); + getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); + final Object etag2 = responseContext.get("ETag"); + Assert.assertNotEquals(etag1, etag2); } @SuppressWarnings("unchecked") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
