This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c2a42e0 Fix result-level cache for queries (#7325)
c2a42e0 is described below
commit c2a42e05bb08aefa31f1d1ed09d568cdaa726b8a
Author: Surekha <[email protected]>
AuthorDate: Thu Apr 18 13:31:29 2019 -0700
Fix result-level cache for queries (#7325)
* Add SegmentDescriptor interval in the hash while calculating Etag
* Add computeResultLevelCacheKey to CacheStrategy
Make HavingSpec cacheable and implement getCacheKey for subclasses
Add unit tests for computeResultLevelCacheKey
* Add more tests
* Use CacheKeyBuilder for HavingSpec's getCacheKey
* Initialize aggregators map to avoid NPE
* adjust cachekey builder for HavingSpec to ignore aggregators
* unused import
* PR comments
---
.../java/org/apache/druid/query/CacheStrategy.java | 10 +
.../query/groupby/GroupByQueryQueryToolChest.java | 22 +
.../query/groupby/having/AlwaysHavingSpec.java | 7 +
.../druid/query/groupby/having/AndHavingSpec.java | 8 +
.../query/groupby/having/DimFilterHavingSpec.java | 10 +
.../having/DimensionSelectorHavingSpec.java | 11 +
.../query/groupby/having/EqualToHavingSpec.java | 11 +
.../groupby/having/GreaterThanHavingSpec.java | 11 +
.../druid/query/groupby/having/HavingSpec.java | 3 +-
.../{NeverHavingSpec.java => HavingSpecUtil.java} | 23 +-
.../query/groupby/having/LessThanHavingSpec.java | 11 +
.../query/groupby/having/NeverHavingSpec.java | 7 +
.../druid/query/groupby/having/NotHavingSpec.java | 9 +
.../druid/query/groupby/having/OrHavingSpec.java | 9 +
.../SegmentMetadataQueryQueryToolChest.java | 12 +
.../query/search/SearchQueryQueryToolChest.java | 6 +
.../query/select/SelectQueryQueryToolChest.java | 6 +
.../TimeBoundaryQueryQueryToolChest.java | 6 +
.../timeseries/TimeseriesQueryQueryToolChest.java | 16 +
.../druid/query/topn/TopNQueryQueryToolChest.java | 15 +
.../groupby/GroupByQueryQueryToolChestTest.java | 486 +++++++++++++++++++++
.../query/groupby/GroupByQueryRunnerTest.java | 12 +
.../druid/query/groupby/having/HavingSpecTest.java | 11 +
.../TimeseriesQueryQueryToolChestTest.java | 162 +++++++
.../query/topn/TopNQueryQueryToolChestTest.java | 87 ++++
.../druid/client/CachingClusteredClient.java | 4 +
.../druid/query/ResultLevelCachingQueryRunner.java | 2 +-
.../druid/client/CachingClusteredClientTest.java | 51 ++-
28 files changed, 1014 insertions(+), 14 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
index 7a300c0..8b106a6 100644
--- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java
@@ -52,6 +52,16 @@ public interface CacheStrategy<T, CacheType, QueryType
extends Query<T>>
byte[] computeCacheKey(QueryType query);
/**
+ * Computes the result level cache key for the given query.
+ * Some implementations may include query parameters that might not be used
in {@code computeCacheKey} for same query
+ *
+ * @param query the query to be cached
+ *
+ * @return the result level cache key
+ */
+ byte[] computeResultLevelCacheKey(QueryType query);
+
+ /**
* Returns the class type of what is used in the cache
*
* @return Returns the class type of what is used in the cache
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index a49b48a..c94d427 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -482,6 +482,28 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<Row, GroupByQuery
}
@Override
+ public byte[] computeResultLevelCacheKey(GroupByQuery query)
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(GROUPBY_QUERY)
+ .appendByte(CACHE_STRATEGY_VERSION)
+ .appendCacheable(query.getGranularity())
+ .appendCacheable(query.getDimFilter())
+ .appendCacheables(query.getAggregatorSpecs())
+ .appendCacheables(query.getDimensions())
+ .appendCacheable(query.getVirtualColumns())
+ .appendCacheable(query.getHavingSpec())
+ .appendCacheable(query.getLimitSpec())
+ .appendCacheables(query.getPostAggregatorSpecs());
+
+ if (query.getSubtotalsSpec() != null &&
!query.getSubtotalsSpec().isEmpty()) {
+ for (List<String> subTotalSpec : query.getSubtotalsSpec()) {
+ builder.appendStrings(subTotalSpec);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java
index 1ac4f31..8450589 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/AlwaysHavingSpec.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.groupby.having;
import org.apache.druid.data.input.Row;
+import org.apache.druid.query.cache.CacheKeyBuilder;
/**
* A "having" spec that always evaluates to true
@@ -31,4 +32,10 @@ public class AlwaysHavingSpec extends BaseHavingSpec
{
return true;
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_ALWAYS).build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java
index 55ca3ee..f035db3 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/AndHavingSpec.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.column.ValueType;
import java.util.List;
@@ -110,4 +111,11 @@ public class AndHavingSpec extends BaseHavingSpec
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_AND)
+ .appendCacheables(havingSpecs).build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java
index 00155ba..cf916bf 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/DimFilterHavingSpec.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.transform.RowFunction;
@@ -170,6 +171,15 @@ public class DimFilterHavingSpec extends BaseHavingSpec
return new TransformSpec(filter, transforms).toTransformer(rowSignature);
}
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_FILTER)
+ .appendCacheable(dimFilter)
+ .appendByte((byte) (isFinalize() ? 1 : 0))
+ .build();
+ }
+
private static class RowAsInputRow implements InputRow
{
private final Row row;
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java
index 9004116..4dfc6e6 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/DimensionSelectorHavingSpec.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.data.input.Row;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.IdentityExtractionFn;
@@ -117,4 +118,14 @@ public class DimensionSelectorHavingSpec extends
BaseHavingSpec
", extractionFn=" + extractionFn +
'}';
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_DIM_SELECTOR)
+ .appendString(dimension)
+ .appendString(value)
+ .appendByteArray(extractionFn == null ? new byte[0] :
extractionFn.getCacheKey())
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java
index ac8d64e..00c471b 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/EqualToHavingSpec.java
@@ -22,7 +22,9 @@ package org.apache.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Map;
@@ -123,4 +125,13 @@ public class EqualToHavingSpec extends BaseHavingSpec
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_EQUAL)
+ .appendString(aggregationName)
+ .appendByteArray(StringUtils.toUtf8(String.valueOf(value)))
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java
index 842dbcd..aa276a9 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/GreaterThanHavingSpec.java
@@ -22,7 +22,9 @@ package org.apache.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Map;
@@ -119,4 +121,13 @@ public class GreaterThanHavingSpec extends BaseHavingSpec
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_GREATER_THAN)
+ .appendString(aggregationName)
+ .appendByteArray(StringUtils.toUtf8(String.valueOf(value)))
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java
index 19ad071..e75641f 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ValueType;
@@ -44,7 +45,7 @@ import java.util.Map;
@JsonSubTypes.Type(name = "always", value = AlwaysHavingSpec.class),
@JsonSubTypes.Type(name = "filter", value = DimFilterHavingSpec.class)
})
-public interface HavingSpec
+public interface HavingSpec extends Cacheable
{
// Atoms for easy combination, but for now they are mostly useful
// for testing.
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java
similarity index 60%
copy from
processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
copy to
processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java
index 55ab48b..e1227e7 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/HavingSpecUtil.java
@@ -19,16 +19,17 @@
package org.apache.druid.query.groupby.having;
-import org.apache.druid.data.input.Row;
-
-/**
- * A "having" spec that always evaluates to false
- */
-public class NeverHavingSpec extends BaseHavingSpec
+public class HavingSpecUtil
{
- @Override
- public boolean eval(Row row)
- {
- return false;
- }
+ static final byte CACHE_TYPE_ID_ALWAYS = 0x0;
+ static final byte CACHE_TYPE_ID_AND = 0x1;
+ static final byte CACHE_TYPE_ID_DIM_SELECTOR = 0x2;
+ static final byte CACHE_TYPE_ID_DIM_FILTER = 0x3;
+ static final byte CACHE_TYPE_ID_EQUAL = 0x4;
+ static final byte CACHE_TYPE_ID_GREATER_THAN = 0x5;
+ static final byte CACHE_TYPE_ID_LESS_THAN = 0x6;
+ static final byte CACHE_TYPE_ID_NEVER = 0x7;
+ static final byte CACHE_TYPE_ID_NOT = 0x8;
+ static final byte CACHE_TYPE_ID_OR = 0x9;
+ static final byte CACHE_TYPE_ID_COUNTING = 0xA;
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java
index 99917cb..3c937cf 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/LessThanHavingSpec.java
@@ -21,7 +21,9 @@ package org.apache.druid.query.groupby.having;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Map;
@@ -117,4 +119,13 @@ public class LessThanHavingSpec extends BaseHavingSpec
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_LESS_THAN)
+ .appendString(aggregationName)
+ .appendByteArray(StringUtils.toUtf8(String.valueOf(value)))
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
index 55ab48b..fa2c15d 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/NeverHavingSpec.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.groupby.having;
import org.apache.druid.data.input.Row;
+import org.apache.druid.query.cache.CacheKeyBuilder;
/**
* A "having" spec that always evaluates to false
@@ -31,4 +32,10 @@ public class NeverHavingSpec extends BaseHavingSpec
{
return false;
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NEVER).build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java
index 315d059..81d7a63 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/NotHavingSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.Row;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.column.ValueType;
import java.util.Map;
@@ -98,4 +99,12 @@ public class NotHavingSpec extends BaseHavingSpec
{
return havingSpec != null ? havingSpec.hashCode() : 0;
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_NOT)
+ .appendCacheable(havingSpec)
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java
b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java
index 7425b90..e648349 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/having/OrHavingSpec.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.column.ValueType;
import java.util.List;
@@ -110,4 +111,12 @@ public class OrHavingSpec extends BaseHavingSpec
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_OR)
+ .appendCacheables(havingSpecs)
+ .build();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index 4dc50f5..2813000 100644
---
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -50,6 +50,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.AggregatorFactory;
import
org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import org.apache.druid.query.aggregation.MetricManipulationFn;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
@@ -73,6 +74,7 @@ public class SegmentMetadataQueryQueryToolChest extends
QueryToolChest<SegmentAn
{
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
+ private static final byte SEGMENT_METADATA_QUERY = 0x16;
private static final Function<SegmentAnalysis, SegmentAnalysis>
MERGE_TRANSFORM_FN = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
@@ -195,6 +197,16 @@ public class SegmentMetadataQueryQueryToolChest extends
QueryToolChest<SegmentAn
}
@Override
+ public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query)
+ {
+ // need to include query "merge" and "lenientAggregatorMerge" for
result level cache key
+ return new
CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query))
+
.appendBoolean(query.isMerge())
+
.appendBoolean(query.isLenientAggregatorMerge())
+ .build();
+ }
+
+ @Override
public TypeReference<SegmentAnalysis> getCacheObjectClazz()
{
return getResultTypeReference();
diff --git
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
index 61f77e3..8f35b25 100644
---
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
@@ -202,6 +202,12 @@ public class SearchQueryQueryToolChest extends
QueryToolChest<Result<SearchResul
}
@Override
+ public byte[] computeResultLevelCacheKey(SearchQuery query)
+ {
+ return computeCacheKey(query);
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java
index d551266..45ebb95 100644
---
a/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/select/SelectQueryQueryToolChest.java
@@ -237,6 +237,12 @@ public class SelectQueryQueryToolChest extends
QueryToolChest<Result<SelectResul
}
@Override
+ public byte[] computeResultLevelCacheKey(SelectQuery query)
+ {
+ return computeCacheKey(query);
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 3519037..9fc0e88 100644
---
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -155,6 +155,12 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
+ public byte[] computeResultLevelCacheKey(TimeBoundaryQuery query)
+ {
+ return computeCacheKey(query);
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 479fbf0..f8f5aa0 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -275,6 +275,22 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
}
@Override
+ public byte[] computeResultLevelCacheKey(TimeseriesQuery query)
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(TIMESERIES_QUERY)
+ .appendBoolean(query.isDescending())
+ .appendBoolean(query.isSkipEmptyBuckets())
+ .appendCacheable(query.getGranularity())
+ .appendCacheable(query.getDimensionsFilter())
+ .appendCacheables(query.getAggregatorSpecs())
+ .appendCacheable(query.getVirtualColumns())
+ .appendCacheables(query.getPostAggregatorSpecs())
+ .appendInt(query.getLimit())
+ .appendBoolean(query.isGrandTotal());
+ return builder.build();
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 4774c5c..2c3bd2b 100644
---
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -327,6 +327,21 @@ public class TopNQueryQueryToolChest extends
QueryToolChest<Result<TopNResultVal
}
@Override
+ public byte[] computeResultLevelCacheKey(TopNQuery query)
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(TOPN_QUERY)
+ .appendCacheable(query.getDimensionSpec())
+ .appendCacheable(query.getTopNMetricSpec())
+ .appendInt(query.getThreshold())
+ .appendCacheable(query.getGranularity())
+ .appendCacheable(query.getDimensionsFilter())
+ .appendCacheables(query.getAggregatorSpecs())
+ .appendCacheable(query.getVirtualColumns())
+ .appendCacheables(query.getPostAggregatorSpecs());
+ return builder.build();
+ }
+
+ @Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
new file mode 100644
index 0000000..2bad8f8
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.CacheStrategy;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.query.filter.AndDimFilter;
+import org.apache.druid.query.filter.BoundDimFilter;
+import org.apache.druid.query.filter.OrDimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.groupby.having.AndHavingSpec;
+import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
+import org.apache.druid.query.groupby.having.EqualToHavingSpec;
+import org.apache.druid.query.groupby.having.GreaterThanHavingSpec;
+import org.apache.druid.query.groupby.having.HavingSpec;
+import org.apache.druid.query.groupby.having.LessThanHavingSpec;
+import org.apache.druid.query.groupby.having.NotHavingSpec;
+import org.apache.druid.query.groupby.having.OrHavingSpec;
+import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.ordering.StringComparators;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class GroupByQueryQueryToolChestTest
+{
+
+ @Test
+ public void testResultLevelCacheKeyWithPostAggregate()
+ {
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias - 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithLimitSpec()
+ {
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias - 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithHavingSpec()
+ {
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(new
GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 8))
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(new
GreaterThanHavingSpec(QueryRunnerTestHelper.uniqueMetric, 10))
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithAndHavingSpec()
+ {
+ final List<HavingSpec> havings = Arrays.asList(
+ new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
+ new OrHavingSpec(
+ Arrays.asList(
+ new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
+ new NotHavingSpec(new EqualToHavingSpec("equalAgg",
Double.valueOf(2)))
+ )
+ )
+ );
+ final HavingSpec andHavingSpec = new AndHavingSpec(havings);
+
+ final List<HavingSpec> havings2 = Arrays.asList(
+ new GreaterThanHavingSpec("agg", Double.valueOf(13.0)),
+ new OrHavingSpec(
+ Arrays.asList(
+ new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
+ new NotHavingSpec(new EqualToHavingSpec("equalAgg",
Double.valueOf(22)))
+ )
+ )
+ );
+ final HavingSpec andHavingSpec2 = new AndHavingSpec(havings2);
+
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(andHavingSpec)
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(andHavingSpec2)
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithHavingDimFilterHavingSpec()
+ {
+ final DimFilterHavingSpec havingSpec1 = new DimFilterHavingSpec(
+ new AndDimFilter(
+ ImmutableList.of(
+ new OrDimFilter(
+ ImmutableList.of(
+ new BoundDimFilter("rows", "2", null, true, false,
null, null, StringComparators.NUMERIC),
+ new SelectorDimFilter("idx", "217", null)
+ )
+ ),
+ new SelectorDimFilter("__time",
String.valueOf(DateTimes.of("2011-04-01").getMillis()), null)
+ )
+ ),
+ null
+ );
+
+ final DimFilterHavingSpec havingSpec2 = new DimFilterHavingSpec(
+ new AndDimFilter(
+ ImmutableList.of(
+ new OrDimFilter(
+ ImmutableList.of(
+ new BoundDimFilter("rows", "2", null, true, false,
null, null, StringComparators.NUMERIC),
+ new SelectorDimFilter("idx", "317", null)
+ )
+ ),
+ new SelectorDimFilter("__time",
String.valueOf(DateTimes.of("2011-04-01").getMillis()), null)
+ )
+ ),
+ null
+ );
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(havingSpec1)
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new
LongSumAggregatorFactory("idx", "index"))
+ .setPostAggregatorSpecs(
+ ImmutableList.of(
+ new ExpressionPostAggregator("post", "alias + 'x'", null,
TestExprMacroTable.INSTANCE)
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ ImmutableList.of(
+ new OrderByColumnSpec("post",
OrderByColumnSpec.Direction.DESCENDING)
+ ),
+ Integer.MAX_VALUE
+ )
+ )
+ .setHavingSpec(havingSpec2)
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithSubTotalsSpec()
+ {
+ final GroupByQuery query1 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(Lists.newArrayList(
+ new DefaultDimensionSpec("quality", "alias"),
+ new DefaultDimensionSpec("market", "market")
+ ))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ QueryRunnerTestHelper.rowsCount,
+ new LongSumAggregatorFactory("idx", "index"),
+ new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
+ new DoubleSumAggregatorFactory("idxDouble", "index")
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setSubtotalsSpec(ImmutableList.of(
+ ImmutableList.of("alias"),
+ ImmutableList.of("market"),
+ ImmutableList.of()
+ ))
+ .build();
+
+ final GroupByQuery query2 = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
+ .setDimensions(Lists.newArrayList(
+ new DefaultDimensionSpec("quality", "alias"),
+ new DefaultDimensionSpec("market", "market")
+ ))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ QueryRunnerTestHelper.rowsCount,
+ new LongSumAggregatorFactory("idx", "index"),
+ new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
+ new DoubleSumAggregatorFactory("idxDouble", "index")
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.dayGran)
+ .setSubtotalsSpec(ImmutableList.of(
+ ImmutableList.of("alias"),
+ ImmutableList.of()
+ ))
+ .build();
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy1 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Row, Object, GroupByQuery> strategy2 = new
GroupByQueryQueryToolChest(
+ null,
+ QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
+ ).getCacheStrategy(query2);
+
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(
+ strategy1.computeResultLevelCacheKey(query1),
+ strategy2.computeResultLevelCacheKey(query2)
+ ));
+ }
+
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 96e3699..cc25be4 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -5369,6 +5369,12 @@ public class GroupByQueryRunnerTest
new BaseHavingSpec()
{
@Override
+ public byte[] getCacheKey()
+ {
+ return new byte[0];
+ }
+
+ @Override
public boolean eval(Row row)
{
return (row.getMetric("idx_subpostagg").floatValue() < 3800);
@@ -5630,6 +5636,12 @@ public class GroupByQueryRunnerTest
new BaseHavingSpec()
{
@Override
+ public byte[] getCacheKey()
+ {
+ return new byte[0];
+ }
+
+ @Override
public boolean eval(Row row)
{
return (row.getMetric("idx_subpostagg").floatValue() < 3800);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java
index ec44b7f..a0e1d74 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/having/HavingSpecTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.junit.Test;
import java.util.ArrayList;
@@ -233,6 +235,15 @@ public class HavingSpecTest
counter.incrementAndGet();
return value;
}
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(HavingSpecUtil.CACHE_TYPE_ID_COUNTING)
+ .appendByte((byte) (value ? 1 : 0))
+ .appendByteArray(StringUtils.toUtf8(String.valueOf(counter)))
+ .build();
+ }
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
index 0742e1e..6d07c59 100644
---
a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
+++
b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
@@ -32,7 +32,9 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.VirtualColumns;
@@ -158,4 +160,164 @@ public class TimeseriesQueryQueryToolChestTest
)
);
}
+
+ @Test
+ public void testResultLevelCacheKey()
+ {
+ final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
+ .dataSource("dummy")
+ .intervals("2015-01-01/2015-01-02")
+ .descending(descending)
+ .granularity(Granularities.ALL)
+ .aggregators(
+ ImmutableList.of(
+ new
LongSumAggregatorFactory("metric0", "metric0"),
+ new
CountAggregatorFactory("metric1")
+ )
+ )
+ .postAggregators(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post",
+ "+",
+ ImmutableList.of(
+ new
FieldAccessPostAggregator(
+ null,
+ "metric1"
+ ),
+ new
FieldAccessPostAggregator(
+ null,
+ "metric0"
+ )
+ )
+ )
+ )
+ )
+ .build();
+
+ final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
+ .dataSource("dummy")
+ .intervals("2015-01-01/2015-01-02")
+ .descending(descending)
+ .granularity(Granularities.ALL)
+ .aggregators(
+ ImmutableList.of(
+ new
LongSumAggregatorFactory("metric0", "metric0"),
+ new
CountAggregatorFactory("metric1")
+ )
+ )
+ .postAggregators(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post",
+ "/",
+ ImmutableList.of(
+ new
FieldAccessPostAggregator(
+ null,
+ "metric1"
+ ),
+ new
FieldAccessPostAggregator(
+ null,
+ "metric0"
+ )
+ )
+ )
+ )
+ )
+ .build();
+
+ Assert.assertTrue(
+ Arrays.equals(
+ TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1),
+ TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2)
+ )
+ );
+ Assert.assertFalse(
+ Arrays.equals(
+
TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1),
+
TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2)
+ )
+ );
+ }
+
+ @Test
+ public void testResultLevelCacheKeyWithGrandTotal()
+ {
+ final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
+ .dataSource("dummy")
+ .intervals("2015-01-01/2015-01-02")
+ .descending(descending)
+ .granularity(Granularities.ALL)
+ .aggregators(
+ ImmutableList.of(
+ new
LongSumAggregatorFactory("metric0", "metric0"),
+ new
CountAggregatorFactory("metric1")
+ )
+ )
+ .postAggregators(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post",
+ "+",
+ ImmutableList.of(
+ new
FieldAccessPostAggregator(
+ null,
+ "metric1"
+ ),
+ new
FieldAccessPostAggregator(
+ null,
+ "metric0"
+ )
+ )
+ )
+ )
+ )
+
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true))
+ .build();
+
+ final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
+ .dataSource("dummy")
+ .intervals("2015-01-01/2015-01-02")
+ .descending(descending)
+ .granularity(Granularities.ALL)
+ .aggregators(
+ ImmutableList.of(
+ new
LongSumAggregatorFactory("metric0", "metric0"),
+ new
CountAggregatorFactory("metric1")
+ )
+ )
+ .postAggregators(
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post",
+ "/",
+ ImmutableList.of(
+ new
FieldAccessPostAggregator(
+ null,
+ "metric1"
+ ),
+ new
FieldAccessPostAggregator(
+ null,
+ "metric0"
+ )
+ )
+ )
+ )
+ )
+
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true))
+ .build();
+
+ Assert.assertTrue(
+ Arrays.equals(
+ TOOL_CHEST.getCacheStrategy(query1).computeCacheKey(query1),
+ TOOL_CHEST.getCacheStrategy(query2).computeCacheKey(query2)
+ )
+ );
+ Assert.assertFalse(
+ Arrays.equals(
+
TOOL_CHEST.getCacheStrategy(query1).computeResultLevelCacheKey(query1),
+
TOOL_CHEST.getCacheStrategy(query2).computeResultLevelCacheKey(query2)
+ )
+ );
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
index e263c1e..cede671 100644
---
a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
+++
b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
@@ -128,6 +129,92 @@ public class TopNQueryQueryToolChestTest
).getCacheStrategy(query2);
Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+
Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1),
+
strategy2.computeResultLevelCacheKey(query2)));
+ }
+
+ @Test
+ public void testComputeResultLevelCacheKeyWithDifferentPostAgg() throws
IOException
+ {
+ final TopNQuery query1 = new TopNQuery(
+ new TableDataSource("dummy"),
+ VirtualColumns.EMPTY,
+ new DefaultDimensionSpec("test", "test"),
+ new LegacyTopNMetricSpec("metric1"),
+ 3,
+ new
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))),
+ null,
+ Granularities.ALL,
+ ImmutableList.of(
+ new LongSumAggregatorFactory("metric1", "metric1"),
+ new LongSumAggregatorFactory("metric2", "metric2")
+ ),
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post1",
+ "/",
+ ImmutableList.of(
+ new FieldAccessPostAggregator(
+ "metric1",
+ "metric1"
+ ),
+ new FieldAccessPostAggregator(
+ "metric2",
+ "metric2"
+ )
+ )
+ )
+ ),
+ null
+ );
+
+ final TopNQuery query2 = new TopNQuery(
+ new TableDataSource("dummy"),
+ VirtualColumns.EMPTY,
+ new DefaultDimensionSpec("test", "test"),
+ new LegacyTopNMetricSpec("metric1"),
+ 3,
+ new
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01T18:00:00/2015-01-02T18:00:00"))),
+ null,
+ Granularities.ALL,
+ ImmutableList.of(
+ new LongSumAggregatorFactory("metric1", "metric1"),
+ new LongSumAggregatorFactory("metric2", "metric2")
+ ),
+ ImmutableList.of(
+ new ArithmeticPostAggregator(
+ "post2",
+ "+",
+ ImmutableList.of(
+ new FieldAccessPostAggregator(
+ "metric1",
+ "metric1"
+ ),
+ new FieldAccessPostAggregator(
+ "metric2",
+ "metric2"
+ )
+ )
+ )
+ ),
+ null
+ );
+
+ final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy1
= new TopNQueryQueryToolChest(
+ null,
+ null
+ ).getCacheStrategy(query1);
+
+ final CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy2
= new TopNQueryQueryToolChest(
+ null,
+ null
+ ).getCacheStrategy(query2);
+
+ //segment level cache key excludes postaggregates in topn
+ Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1),
strategy2.computeCacheKey(query2)));
+ Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1),
strategy1.computeResultLevelCacheKey(query1)));
+
Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1),
+
strategy2.computeResultLevelCacheKey(query2)));
}
@Test
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index eaa08d6..1facccd 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -385,6 +385,10 @@ public class CachingClusteredClient implements
QuerySegmentWalker
break;
}
hasher.putString(p.getServer().getSegment().getId().toString(),
StandardCharsets.UTF_8);
+ // it is important to add the "query interval" as part ETag calculation
+ // to have result level cache work correctly for queries with different
+ // intervals covering the same set of segments
+ hasher.putString(p.rhs.getInterval().toString(),
StandardCharsets.UTF_8);
}
if (hasOnlyHistoricalSegments) {
diff --git
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 3efd946..6a303b8 100644
---
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -80,7 +80,7 @@ public class ResultLevelCachingQueryRunner<T> implements
QueryRunner<T>
{
if (useResultCache || populateResultCache) {
- final String cacheKeyStr =
StringUtils.fromUtf8(strategy.computeCacheKey(query));
+ final String cacheKeyStr =
StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
final byte[] cachedResultSet =
fetchResultsFromResultLevelCache(cacheKeyStr);
String existingResultSetId = extractEtagFromResults(cachedResultSet);
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index f81d64f..ca3a309 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -3129,7 +3129,56 @@ public class CachingClusteredClientTest
Map<String, Object> responseContext = new HashMap<>();
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
- Assert.assertEquals("Z/eS4rQz5v477iq7Aashr6JPZa0=",
responseContext.get("ETag"));
+ Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=",
responseContext.get("ETag"));
+ }
+
+ @Test
+ public void testEtagforDifferentQueryInterval()
+ {
+ final Interval interval = Intervals.of("2016-01-01/2016-01-02");
+ final Interval queryInterval =
Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00");
+ final Interval queryInterval2 =
Intervals.of("2016-01-01T18:00:00/2016-01-02T18:00:00");
+ final DataSegment dataSegment = new DataSegment(
+ "dataSource",
+ interval,
+ "ver",
+ ImmutableMap.of(
+ "type", "hdfs",
+ "path", "/tmp"
+ ),
+ ImmutableList.of("product"),
+ ImmutableList.of("visited_sum"),
+ NoneShardSpec.instance(),
+ 9,
+ 12334
+ );
+ final ServerSelector selector = new ServerSelector(
+ dataSegment,
+ new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy())
+ );
+ selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0],
null), dataSegment);
+ timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));
+
+ final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(DATA_SOURCE)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
+ .context(ImmutableMap.of("If-None-Match",
"aVJV29CJY93rszVW/QBy0arWZo0="))
+ .build();
+
+ final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(DATA_SOURCE)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2)))
+ .context(ImmutableMap.of("If-None-Match",
"aVJV29CJY93rszVW/QBy0arWZo0="))
+ .build();
+
+
+ final Map<String, Object> responseContext = new HashMap<>();
+
+ getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
+ final Object etag1 = responseContext.get("ETag");
+ getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
+ final Object etag2 = responseContext.get("ETag");
+ Assert.assertNotEquals(etag1, etag2);
}
@SuppressWarnings("unchecked")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]