This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3525d40 Cache: Add maxEntrySize config, make groupBy cacheable by
default. (#5108)
3525d40 is described below
commit 3525d4059e230b4995b658282fd13f41e8125af5
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 7 10:23:15 2018 -0700
Cache: Add maxEntrySize config, make groupBy cacheable by default. (#5108)
* Cache: Add maxEntrySize config.
The idea is this makes it more feasible to cache query types that
can potentially generate large result sets, like groupBy and select,
without fear of writing too much to the cache per query.
Includes a refactor of cache population code in CachingQueryRunner and
CachingClusteredClient, such that they now use the same CachePopulator
interface with two implementations: one for foreground and one for
background.
The main reason for splitting the foreground / background impls is
that the foreground impl can have a more effective implementation of
maxEntrySize. It can stop retaining subvalues for the cache early.
* Add CachePopulatorStats.
* Fix whitespace.
* Fix docs.
* Fix various tests.
* Add tests.
* Fix tests.
* Better tests
* Remove conflict markers.
* Fix licenses.
---
docs/content/configuration/broker.md | 3 +-
docs/content/configuration/historical.md | 3 +-
docs/content/configuration/index.md | 23 ++-
docs/content/configuration/realtime.md | 3 +-
docs/content/operations/metrics.md | 3 +
.../io/druid/indexing/kafka/KafkaIndexTask.java | 3 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +
.../java/io/druid/indexing/common/TaskToolbox.java | 9 ++
.../druid/indexing/common/TaskToolboxFactory.java | 9 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 3 +-
.../indexing/common/task/RealtimeIndexTask.java | 1 +
.../io/druid/indexing/common/TaskToolboxTest.java | 2 +
.../AppenderatorDriverRealtimeIndexTaskTest.java | 2 +
.../indexing/common/task/CompactionTaskTest.java | 1 +
.../druid/indexing/common/task/IndexTaskTest.java | 1 +
.../common/task/RealtimeIndexTaskTest.java | 2 +
.../common/task/SameIntervalMergeTaskTest.java | 1 +
.../firehose/IngestSegmentFirehoseFactoryTest.java | 1 +
.../IngestSegmentFirehoseFactoryTimelineTest.java | 1 +
.../overlord/SingleTaskBackgroundRunnerTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +
.../indexing/worker/WorkerTaskManagerTest.java | 1 +
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../src/main/java/io/druid/client/CacheUtil.java | 22 ---
.../io/druid/client/CachingClusteredClient.java | 100 +++----------
.../java/io/druid/client/CachingQueryRunner.java | 66 +--------
.../client/cache/BackgroundCachePopulator.java | 141 ++++++++++++++++++
.../java/io/druid/client/cache/CacheConfig.java | 14 +-
.../java/io/druid/client/cache/CacheMonitor.java | 40 +++--
.../java/io/druid/client/cache/CachePopulator.java | 23 ++-
.../io/druid/client/cache/CachePopulatorStats.java | 97 ++++++++++++
.../client/cache/ForegroundCachePopulator.java | 127 ++++++++++++++++
.../java/io/druid/client/cache/HybridCache.java | 2 +-
.../src/main/java/io/druid/guice/CacheModule.java | 2 +
.../java/io/druid/guice/DruidProcessingModule.java | 19 ++-
.../io/druid/guice/RouterProcessingModule.java | 18 +--
.../realtime/appenderator/AppenderatorImpl.java | 7 +-
.../realtime/appenderator/Appenderators.java | 8 +-
.../DefaultRealtimeAppenderatorFactory.java | 12 +-
.../appenderator/SinkQuerySegmentWalker.java | 18 ++-
.../segment/realtime/plumber/FlushingPlumber.java | 3 +
.../realtime/plumber/FlushingPlumberSchool.java | 8 +-
.../segment/realtime/plumber/RealtimePlumber.java | 5 +-
.../realtime/plumber/RealtimePlumberSchool.java | 7 +-
.../druid/server/coordination/ServerManager.java | 10 +-
.../CachingClusteredClientFunctionalityTest.java | 17 ++-
.../druid/client/CachingClusteredClientTest.java | 27 +++-
.../io/druid/client/CachingQueryRunnerTest.java | 52 +++++--
.../io/druid/client/cache/CachePopulatorTest.java | 163 +++++++++++++++++++++
.../druid/segment/realtime/FireDepartmentTest.java | 2 +
.../realtime/appenderator/AppenderatorTester.java | 4 +-
.../plumber/RealtimePlumberSchoolTest.java | 2 +
.../server/coordination/ServerManagerTest.java | 5 +-
53 files changed, 826 insertions(+), 273 deletions(-)
diff --git a/docs/content/configuration/broker.md
b/docs/content/configuration/broker.md
index 2d442bf..3695396 100644
--- a/docs/content/configuration/broker.md
+++ b/docs/content/configuration/broker.md
@@ -113,8 +113,9 @@ You can optionally only configure caching to be enabled on
the broker by setting
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level
caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result
level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of
query response that can be cached.|`Integer.MAX_VALUE`|
-|`druid.broker.cache.unCacheable`|All druid query types|All query types to not
cache.|`["groupBy", "select"]`|
+|`druid.broker.cache.unCacheable`|All druid query types|All query types to not
cache.|`["select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with
more segments than this number will not attempt to fetch from cache at the
broker level, leaving potential caching fetches (and cache result merging) to
the historicals|`Integer.MAX_VALUE`|
+|`druid.broker.cache.maxEntrySize`|positive integer or -1|Maximum size of an
individual cache entry (processed results for one segment), in bytes, or -1 for
unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/configuration/historical.md
b/docs/content/configuration/historical.md
index 02dddb9..ae770ea 100644
--- a/docs/content/configuration/historical.md
+++ b/docs/content/configuration/historical.md
@@ -97,7 +97,8 @@ You can optionally only configure caching to be enabled on
the historical by set
|--------|---------------|-----------|-------|
|`druid.historical.cache.useCache`|true, false|Enable the cache on the
historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the
historical.|false|
-|`druid.historical.cache.unCacheable`|All druid query types|All query types to
not cache.|["groupBy", "select"]|
+|`druid.historical.cache.unCacheable`|All druid query types|All query types to
not cache.|["select"]|
+|`druid.historical.cache.maxEntrySize`|positive integer or -1|Maximum size of
an individual cache entry (processed results for one segment), in bytes, or -1
for unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/configuration/index.md
b/docs/content/configuration/index.md
index d4ca2dc..3d9f262 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -353,16 +353,21 @@ You can enable caching of results at the broker,
historical, or realtime level u
|<code>druid.(broker|historical|realtime).cache.unCacheable</code>|All
druid query types|All query types to not cache.|["groupBy", "select"]|
|<code>druid.(broker|historical|realtime).cache.useCache</code>|true,
false|Whether to use cache for getting query results.|false|
|<code>druid.(broker|historical|realtime).cache.populateCache</code>|true,
false|Whether to populate cache.|false|
+|<code>druid.(broker|historical|realtime).cache.maxEntrySize</code>|positive
integer or -1|Maximum size of an individual cache entry (processed results for
one segment), in bytes, or -1 for unlimited.|`-1`|
#### Local Cache
+<div class="note caution">
+DEPRECATED: Use caffeine instead
+</div>
+
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. You must set this if
you enabled populateCache/useCache, or else cache size of zero wouldn't really
cache anything.|0|
|`druid.cache.initialSize`|Initial size of the hashtable backing the
cache.|500000|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every
`logEvictionCount` items.|0|
-#### Memcache
+#### Memcached
|Property|Description|Default|
|--------|-----------|-------|
@@ -372,6 +377,22 @@ You can enable caching of results at the broker,
historical, or realtime level u
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached
object.|52428800 (50 MB)|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
+#### Caffeine Cache
+
+A highly performant local cache implementation for Druid based on
[Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher
if using `COMMON_FJP`.
+
+Below are the configuration options known to this module:
+
+|`runtime.properties`|Description|Default|
+|--------------------|-----------|-------|
+|`druid.cache.type`| Set this to `caffeine`|`local`|
+|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on
heap.|None (unlimited)|
+|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache
entry may be expired|None (no time limit)|
+|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine
maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or
`SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)|
+|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment
from a node) should cause an eager eviction of associated cache values|`false`|
+
+See the [Caching documentation](caching.html) for more detail.
+
### Indexing Service Discovery
This config is used to find the [Indexing
Service](../design/indexing-service.html) using Curator service discovery. Only
required if you are actually running an indexing service.
diff --git a/docs/content/configuration/realtime.md
b/docs/content/configuration/realtime.md
index 80fe955..b75fca5 100644
--- a/docs/content/configuration/realtime.md
+++ b/docs/content/configuration/realtime.md
@@ -72,6 +72,7 @@ You can optionally configure caching to be enabled on the
realtime node by setti
|--------|---------------|-----------|-------|
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the
realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the
realtime.|false|
-|`druid.realtime.cache.unCacheable`|All druid query types|All query types to
not cache.|`["groupBy", "select"]`|
+|`druid.realtime.cache.unCacheable`|All druid query types|All query types to
not cache.|`["select"]`|
+|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an
individual cache entry (processed results for one segment), in bytes, or -1 for
unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/operations/metrics.md
b/docs/content/operations/metrics.md
index d2c802d..6dc3e0f 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -88,6 +88,9 @@ Available Metrics
|`*/averageByte`|Average cache entry byte size.||Varies.|
|`*/timeouts`|Number of cache timeouts.||0|
|`*/errors`|Number of cache errors.||0|
+|`*/put/ok`|Number of new cache entries successfully cached.||Varies, but more
than zero.|
+|`*/put/error`|Number of new cache entries that could not be cached due to
errors.||Varies, but more than zero.|
+|`*/put/oversized`|Number of potential new cache entries that were skipped due
to being too large (based on
`druid.{broker,historical,realtime}.cache.maxEntrySize` properties).||Varies.|
#### Memcached only metrics
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index a9d5c98..8cb6323 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -247,7 +247,8 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getCache(),
- toolbox.getCacheConfig()
+ toolbox.getCacheConfig(),
+ toolbox.getCachePopulatorStats()
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1e30a95..1929bb4 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
@@ -2147,6 +2148,7 @@ public class KafkaIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
+ new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
index d63c2a1..e31969e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.inject.Provider;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
@@ -89,6 +90,7 @@ public class TaskToolbox
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9 indexMergerV9;
private final TaskReportFileWriter taskReportFileWriter;
@@ -117,6 +119,7 @@ public class TaskToolbox
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats,
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
DruidNode druidNode,
@@ -144,6 +147,7 @@ public class TaskToolbox
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
+ this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null
IndexMergerV9");
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
@@ -268,6 +272,11 @@ public class TaskToolbox
return cacheConfig;
}
+ public CachePopulatorStats getCachePopulatorStats()
+ {
+ return cachePopulatorStats;
+ }
+
public IndexMergerV9 getIndexMergerV9()
{
return indexMergerV9;
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
index 95f1b32..f858761 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
@@ -23,10 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Provider;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
@@ -35,6 +34,8 @@ import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@@ -73,6 +74,7 @@ public class TaskToolboxFactory
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9 indexMergerV9;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
@@ -100,6 +102,7 @@ public class TaskToolboxFactory
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats,
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
@@ -126,6 +129,7 @@ public class TaskToolboxFactory
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
+ this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9 = indexMergerV9;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
@@ -157,6 +161,7 @@ public class TaskToolboxFactory
indexIO,
cache,
cacheConfig,
+ cachePopulatorStats,
indexMergerV9,
druidNodeAnnouncer,
druidNode,
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 6c367fd..c1ee50b 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -706,7 +706,8 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getCache(),
- toolbox.getCacheConfig()
+ toolbox.getCacheConfig(),
+ toolbox.getCachePopulatorStats()
);
}
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 37a7ccf..a496182 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),
+ toolbox.getCachePopulatorStats(),
toolbox.getObjectMapper()
);
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
index d557e8c..abd088f 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
@@ -111,6 +112,7 @@ public class TaskToolboxTest
mockIndexIO,
mockCache,
mockCacheConfig,
+ new CachePopulatorStats(),
mockIndexMergerV9,
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 9f4073b..9166265 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.common.config.NullHandling;
import io.druid.data.input.Firehose;
@@ -1500,6 +1501,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
+ new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 99adee0..2288393 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -586,6 +586,7 @@ public class CompactionTaskTest
indexIO,
null,
null,
+ null,
new IndexMergerV9(objectMapper, indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance()),
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
index c05d2dc..b15261c 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
@@ -1514,6 +1514,7 @@ public class IndexTaskTest
indexIO,
null,
null,
+ null,
indexMergerV9,
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
index d3edfef..36fe3a3 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.common.config.NullHandling;
import io.druid.data.input.Firehose;
@@ -1080,6 +1081,7 @@ public class RealtimeIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
+ new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
diff --git
a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
index 2e816f5..900f48e 100644
---
a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
@@ -252,6 +252,7 @@ public class SameIntervalMergeTaskTest
indexIO,
null,
null,
+ null,
EasyMock.createMock(IndexMergerV9.class),
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index f2085ea..60ef693 100644
---
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -316,6 +316,7 @@ public class IngestSegmentFirehoseFactoryTest
INDEX_IO,
null,
null,
+ null,
INDEX_MERGER_V9,
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index c8149e3..469d4a8 100644
---
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -340,6 +340,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
INDEX_IO,
null,
null,
+ null,
INDEX_MERGER_V9,
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 9a95b87..a5cd125 100644
---
a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -99,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest
utils.getTestIndexIO(),
null,
null,
+ null,
utils.getTestIndexMergerV9(),
null,
node,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index 615fe39..dc41b86 100644
---
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@@ -616,6 +617,7 @@ public class TaskLifecycleTest
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
+ new CachePopulatorStats(),
INDEX_MERGER_V9,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
diff --git
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
index cd7e85e..ff528ea 100644
---
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -125,6 +125,7 @@ public class WorkerTaskManagerTest
indexIO,
null,
null,
+ null,
indexMergerV9,
null,
null,
diff --git
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
index 2b5374b..1af92ad 100644
---
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -187,6 +187,7 @@ public class WorkerTaskMonitorTest
indexIO,
null,
null,
+ null,
indexMergerV9,
null,
null,
diff --git a/server/src/main/java/io/druid/client/CacheUtil.java
b/server/src/main/java/io/druid/client/CacheUtil.java
index 78ba5e8..622ec0c 100644
--- a/server/src/main/java/io/druid/client/CacheUtil.java
+++ b/server/src/main/java/io/druid/client/CacheUtil.java
@@ -19,9 +19,6 @@
package io.druid.client;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.StringUtils;
@@ -31,8 +28,6 @@ import io.druid.query.QueryContexts;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
public class CacheUtil
@@ -57,23 +52,6 @@ public class CacheUtil
);
}
- public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey
key, Iterable<Object> results)
- {
- try {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- try (JsonGenerator gen = mapper.getFactory().createGenerator(bytes)) {
- for (Object result : results) {
- gen.writeObject(result);
- }
- }
-
- cache.put(key, bytes.toByteArray());
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
public static <T> boolean useCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java
b/server/src/main/java/io/druid/client/CachingClusteredClient.java
index 50f6389..d227dd4 100644
--- a/server/src/main/java/io/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java
@@ -32,17 +32,12 @@ import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
-import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
@@ -88,8 +83,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -102,8 +95,8 @@ public class CachingClusteredClient implements
QuerySegmentWalker
private final TimelineServerView serverView;
private final Cache cache;
private final ObjectMapper objectMapper;
+ private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
- private final ListeningExecutorService backgroundExecutorService;
@Inject
public CachingClusteredClient(
@@ -111,7 +104,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
TimelineServerView serverView,
Cache cache,
@Smile ObjectMapper objectMapper,
- @BackgroundCaching ExecutorService backgroundExecutorService,
+ CachePopulator cachePopulator,
CacheConfig cacheConfig
)
{
@@ -119,13 +112,13 @@ public class CachingClusteredClient implements
QuerySegmentWalker
this.serverView = serverView;
this.cache = cache;
this.objectMapper = objectMapper;
+ this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
- this.backgroundExecutorService =
MoreExecutors.listeningDecorator(backgroundExecutorService);
- if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
+ if (cacheConfig.isQueryCacheable(Query.GROUP_BY) &&
(cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
log.warn(
- "Even though groupBy caching is enabled, v2 groupBys will not be
cached. "
- + "Consider disabling cache on your broker and enabling it on your
data nodes to enable v2 groupBy caching."
+ "Even though groupBy caching is enabled in your configuration, v2
groupBys will not be cached on the broker. "
+ + "Consider enabling caching on your data nodes if it is not already
enabled."
);
}
@@ -218,7 +211,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
private final boolean isBySegment;
private final int uncoveredIntervalsLimit;
private final Query<T> downstreamQuery;
- private final Map<String, CachePopulator> cachePopulatorMap =
Maps.newHashMap();
+ private final Map<String, Cache.NamedKey> cachePopulatorKeyMap =
Maps.newHashMap();
SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String,
Object> responseContext)
{
@@ -420,7 +413,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
} else if (populateCache) {
// otherwise, if populating cache, add segment to list of segments
to cache
final String segmentIdentifier =
segment.getServer().getSegment().getIdentifier();
- addCachePopulator(segmentCacheKey, segmentIdentifier,
segmentQueryInterval);
+ addCachePopulatorKey(segmentCacheKey, segmentIdentifier,
segmentQueryInterval);
}
});
return alreadyCachedResults;
@@ -453,22 +446,22 @@ public class CachingClusteredClient implements
QuerySegmentWalker
}
}
- private void addCachePopulator(
+ private void addCachePopulatorKey(
Cache.NamedKey segmentCacheKey,
String segmentIdentifier,
Interval segmentQueryInterval
)
{
- cachePopulatorMap.put(
+ cachePopulatorKeyMap.put(
StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
- new CachePopulator(cache, objectMapper, segmentCacheKey)
+ segmentCacheKey
);
}
@Nullable
- private CachePopulator getCachePopulator(String segmentId, Interval
segmentInterval)
+ private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval
segmentInterval)
{
- return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId,
segmentInterval));
+ return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId,
segmentInterval));
}
private SortedMap<DruidServer, List<SegmentDescriptor>>
groupSegmentsByServer(Set<ServerToSegment> segments)
@@ -601,27 +594,19 @@ public class CachingClusteredClient implements
QuerySegmentWalker
responseContext
);
final Function<T, Object> cacheFn =
strategy.prepareForSegmentLevelCache();
+
return resultsBySegments
.map(result -> {
final BySegmentResultValueClass<T> resultsOfSegment =
result.getValue();
- final CachePopulator cachePopulator =
- getCachePopulator(resultsOfSegment.getSegmentId(),
resultsOfSegment.getInterval());
- Sequence<T> res = Sequences
- .simple(resultsOfSegment.getResults())
- .map(r -> {
- if (cachePopulator != null) {
- // only compute cache data if populating cache
-
cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() ->
cacheFn.apply(r)));
- }
- return r;
- })
- .map(
- toolChest.makePreComputeManipulatorFn(downstreamQuery,
MetricManipulatorFns.deserializing())::apply
- );
- if (cachePopulator != null) {
- res = res.withEffect(cachePopulator::populate,
MoreExecutors.sameThreadExecutor());
+ final Cache.NamedKey cachePopulatorKey =
+ getCachePopulatorKey(resultsOfSegment.getSegmentId(),
resultsOfSegment.getInterval());
+ Sequence<T> res = Sequences.simple(resultsOfSegment.getResults());
+ if (cachePopulatorKey != null) {
+ res = cachePopulator.wrap(res, cacheFn::apply, cache,
cachePopulatorKey);
}
- return res;
+ return res.map(
+ toolChest.makePreComputeManipulatorFn(downstreamQuery,
MetricManipulatorFns.deserializing())::apply
+ );
})
.flatMerge(seq -> seq, query.getResultOrdering());
}
@@ -644,43 +629,4 @@ public class CachingClusteredClient implements
QuerySegmentWalker
return rhs;
}
}
-
- private class CachePopulator
- {
- private final Cache cache;
- private final ObjectMapper mapper;
- private final Cache.NamedKey key;
- private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures
= new ConcurrentLinkedQueue<>();
-
- CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
- {
- this.cache = cache;
- this.mapper = mapper;
- this.key = key;
- }
-
- public void populate()
- {
- Futures.addCallback(
- Futures.allAsList(cacheFutures),
- new FutureCallback<List<Object>>()
- {
- @Override
- public void onSuccess(List<Object> cacheData)
- {
- CacheUtil.populate(cache, mapper, key, cacheData);
- // Help out GC by making sure all references are gone
- cacheFutures.clear();
- }
-
- @Override
- public void onFailure(Throwable throwable)
- {
- log.error(throwable, "Background caching failed");
- }
- },
- backgroundExecutorService
- );
- }
- }
}
diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java
b/server/src/main/java/io/druid/client/CachingQueryRunner.java
index df9e885..422ea65 100644
--- a/server/src/main/java/io/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java
@@ -23,18 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
-import io.druid.java.util.common.logger.Logger;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
@@ -46,20 +41,19 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
public class CachingQueryRunner<T> implements QueryRunner<T>
{
- private static final Logger log = new Logger(CachingQueryRunner.class);
private final String segmentIdentifier;
private final SegmentDescriptor segmentDescriptor;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
private final Cache cache;
private final ObjectMapper mapper;
+ private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
- private final ListeningExecutorService backgroundExecutorService;
public CachingQueryRunner(
String segmentIdentifier,
@@ -68,7 +62,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
Cache cache,
QueryToolChest toolchest,
QueryRunner<T> base,
- ExecutorService backgroundExecutorService,
+ CachePopulator cachePopulator,
CacheConfig cacheConfig
)
{
@@ -78,7 +72,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
- this.backgroundExecutorService =
MoreExecutors.listeningDecorator(backgroundExecutorService);
+ this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
}
@@ -140,56 +134,10 @@ public class CachingQueryRunner<T> implements
QueryRunner<T>
}
}
- final Collection<ListenableFuture<?>> cacheFutures =
Collections.synchronizedList(Lists.newLinkedList());
+ final Collection<ListenableFuture<?>> cacheFutures =
Collections.synchronizedList(new LinkedList<>());
if (populateCache) {
final Function cacheFn = strategy.prepareForSegmentLevelCache();
-
- return Sequences.withEffect(
- Sequences.map(
- base.run(queryPlus, responseContext),
- new Function<T, T>()
- {
- @Override
- public T apply(final T input)
- {
- final SettableFuture<Object> future =
SettableFuture.create();
- cacheFutures.add(future);
- backgroundExecutorService.submit(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- future.set(cacheFn.apply(input));
- }
- catch (Exception e) {
- // if there is exception, should setException to
quit the caching processing
- future.setException(e);
- }
- }
- }
- );
- return input;
- }
- }
- ),
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- CacheUtil.populate(cache, mapper, key,
Futures.allAsList(cacheFutures).get());
- }
- catch (Exception e) {
- log.error(e, "Error while getting future for cache task");
- throw Throwables.propagate(e);
- }
- }
- },
- backgroundExecutorService
- );
+ return cachePopulator.wrap(base.run(queryPlus, responseContext), value
-> cacheFn.apply(value), cache, key);
} else {
return base.run(queryPlus, responseContext);
}
diff --git
a/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
new file mode 100644
index 0000000..d20fcf4
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+public class BackgroundCachePopulator implements CachePopulator
+{
+ private static final Logger log = new Logger(BackgroundCachePopulator.class);
+
+ private final ListeningExecutorService exec;
+ private final ObjectMapper objectMapper;
+ private final CachePopulatorStats cachePopulatorStats;
+ private final long maxEntrySize;
+
+ public BackgroundCachePopulator(
+ final ExecutorService exec,
+ final ObjectMapper objectMapper,
+ final CachePopulatorStats cachePopulatorStats,
+ final long maxEntrySize
+ )
+ {
+ this.exec = MoreExecutors.listeningDecorator(exec);
+ this.objectMapper = Preconditions.checkNotNull(objectMapper,
"objectMapper");
+ this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats,
"cachePopulatorStats");
+ this.maxEntrySize = maxEntrySize;
+ }
+
+ @Override
+ public <T, CacheType> Sequence<T> wrap(
+ final Sequence<T> sequence,
+ final Function<T, CacheType> cacheFn,
+ final Cache cache,
+ final Cache.NamedKey cacheKey
+ )
+ {
+ final List<ListenableFuture<CacheType>> cacheFutures = new LinkedList<>();
+
+ final Sequence<T> wrappedSequence = Sequences.map(
+ sequence,
+ input -> {
+ cacheFutures.add(exec.submit(() -> cacheFn.apply(input)));
+ return input;
+ }
+ );
+
+ return Sequences.withEffect(
+ wrappedSequence,
+ () -> {
+ Futures.addCallback(
+ Futures.allAsList(cacheFutures),
+ new FutureCallback<List<CacheType>>()
+ {
+ @Override
+ public void onSuccess(List<CacheType> results)
+ {
+ populateCache(cache, cacheKey, results);
+ // Help out GC by making sure all references are gone
+ cacheFutures.clear();
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ log.error(t, "Background caching failed");
+ }
+ },
+ exec
+ );
+ },
+ MoreExecutors.sameThreadExecutor()
+ );
+ }
+
+ private <CacheType> void populateCache(
+ final Cache cache,
+ final Cache.NamedKey cacheKey,
+ final List<CacheType> results
+ )
+ {
+ try {
+ final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+ try (JsonGenerator gen =
objectMapper.getFactory().createGenerator(bytes)) {
+ for (CacheType result : results) {
+ gen.writeObject(result);
+
+ if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+ cachePopulatorStats.incrementOversized();
+ return;
+ }
+ }
+ }
+
+ if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+ cachePopulatorStats.incrementOversized();
+ return;
+ }
+
+ cache.put(cacheKey, bytes.toByteArray());
+ cachePopulatorStats.incrementOk();
+ }
+ catch (Exception e) {
+ log.warn(e, "Could not populate cache");
+ cachePopulatorStats.incrementError();
+ }
+ }
+}
diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java
b/server/src/main/java/io/druid/client/cache/CacheConfig.java
index 4b9f020..7d8e35f 100644
--- a/server/src/main/java/io/druid/client/cache/CacheConfig.java
+++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java
@@ -20,10 +20,10 @@
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
import io.druid.query.Query;
import javax.validation.constraints.Min;
-import java.util.Arrays;
import java.util.List;
public class CacheConfig
@@ -52,10 +52,13 @@ public class CacheConfig
private int cacheBulkMergeLimit = Integer.MAX_VALUE;
@JsonProperty
- private int resultLevelCacheLimit = Integer.MAX_VALUE;
+ private int maxEntrySize = 1_000_000;
+
+ @JsonProperty
+ private List<String> unCacheable = ImmutableList.of(Query.SELECT);
@JsonProperty
- private List<String> unCacheable = Arrays.asList(Query.GROUP_BY,
Query.SELECT);
+ private int resultLevelCacheLimit = Integer.MAX_VALUE;
public boolean isPopulateCache()
{
@@ -87,6 +90,11 @@ public class CacheConfig
return cacheBulkMergeLimit;
}
+ public int getMaxEntrySize()
+ {
+ return maxEntrySize;
+ }
+
public int getResultLevelCacheLimit()
{
return resultLevelCacheLimit;
diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java
b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
index 515555a..576f9db 100644
--- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java
+++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
@@ -20,27 +20,24 @@
package io.druid.client.cache;
import com.google.inject.Inject;
+import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.metrics.AbstractMonitor;
-import io.druid.java.util.common.StringUtils;
public class CacheMonitor extends AbstractMonitor
{
// package private for tests
volatile Cache cache;
+ private final CachePopulatorStats cachePopulatorStats;
private volatile CacheStats prevCacheStats = null;
+ private volatile CachePopulatorStats.Snapshot prevCachePopulatorStats = null;
- public CacheMonitor()
+ @Inject
+ public CacheMonitor(final CachePopulatorStats cachePopulatorStats)
{
- }
-
- public CacheMonitor(
- Cache cache
- )
- {
- this.cache = cache;
+ this.cachePopulatorStats = cachePopulatorStats;
}
// make it possible to enable CacheMonitor even if cache is not bound
@@ -58,10 +55,16 @@ public class CacheMonitor extends AbstractMonitor
final CacheStats currCacheStats = cache.getStats();
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
+ final CachePopulatorStats.Snapshot currCachePopulatorStats =
cachePopulatorStats.snapshot();
+ final CachePopulatorStats.Snapshot deltaCachePopulatorStats =
currCachePopulatorStats.delta(
+ prevCachePopulatorStats
+ );
+
final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
- emitStats(emitter, "query/cache/delta", deltaCacheStats, builder);
- emitStats(emitter, "query/cache/total", currCacheStats, builder);
+ emitStats(emitter, "query/cache/delta", deltaCachePopulatorStats,
deltaCacheStats, builder);
+ emitStats(emitter, "query/cache/total", currCachePopulatorStats,
currCacheStats, builder);
+ prevCachePopulatorStats = currCachePopulatorStats;
prevCacheStats = currCacheStats;
// Any custom cache statistics that need monitoring
@@ -71,13 +74,15 @@ public class CacheMonitor extends AbstractMonitor
}
private void emitStats(
- ServiceEmitter emitter,
+ final ServiceEmitter emitter,
final String metricPrefix,
- CacheStats cacheStats,
- ServiceMetricEvent.Builder builder
+ final CachePopulatorStats.Snapshot cachePopulatorStats,
+ final CacheStats cacheStats,
+ final ServiceMetricEvent.Builder builder
)
{
if (cache != null) {
+ // Cache stats.
emitter.emit(builder.build(StringUtils.format("%s/numEntries",
metricPrefix), cacheStats.getNumEntries()));
emitter.emit(builder.build(StringUtils.format("%s/sizeBytes",
metricPrefix), cacheStats.getSizeInBytes()));
emitter.emit(builder.build(StringUtils.format("%s/hits", metricPrefix),
cacheStats.getNumHits()));
@@ -87,6 +92,13 @@ public class CacheMonitor extends AbstractMonitor
emitter.emit(builder.build(StringUtils.format("%s/averageBytes",
metricPrefix), cacheStats.averageBytes()));
emitter.emit(builder.build(StringUtils.format("%s/timeouts",
metricPrefix), cacheStats.getNumTimeouts()));
emitter.emit(builder.build(StringUtils.format("%s/errors",
metricPrefix), cacheStats.getNumErrors()));
+
+ // Cache populator stats.
+ emitter.emit(builder.build(StringUtils.format("%s/put/ok",
metricPrefix), cachePopulatorStats.getNumOk()));
+ emitter.emit(builder.build(StringUtils.format("%s/put/error",
metricPrefix), cachePopulatorStats.getNumError()));
+ emitter.emit(
+ builder.build(StringUtils.format("%s/put/oversized", metricPrefix),
cachePopulatorStats.getNumOversized())
+ );
}
}
}
diff --git
a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
b/server/src/main/java/io/druid/client/cache/CachePopulator.java
similarity index 66%
rename from
processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
rename to server/src/main/java/io/druid/client/cache/CachePopulator.java
index abd7514..5c8fc88 100644
--- a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
+++ b/server/src/main/java/io/druid/client/cache/CachePopulator.java
@@ -17,21 +17,18 @@
* under the License.
*/
-package io.druid.guice.annotations;
+package io.druid.client.cache;
-import com.google.inject.BindingAnnotation;
+import io.druid.java.util.common.guava.Sequence;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.util.function.Function;
-/**
- *
- */
-@BindingAnnotation
-@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface BackgroundCaching
+public interface CachePopulator
{
+ <T, CacheType> Sequence<T> wrap(
+ Sequence<T> sequence,
+ Function<T, CacheType> cacheFn,
+ Cache cache,
+ Cache.NamedKey cacheKey
+ );
}
diff --git
a/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
new file mode 100644
index 0000000..f1bcd30
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ */
+public class CachePopulatorStats
+{
+ private final AtomicLong okCounter = new AtomicLong();
+ private final AtomicLong errorCounter = new AtomicLong();
+ private final AtomicLong oversizedCounter = new AtomicLong();
+
+ public void incrementOk()
+ {
+ okCounter.incrementAndGet();
+ }
+
+ public void incrementError()
+ {
+ errorCounter.incrementAndGet();
+ }
+
+ public void incrementOversized()
+ {
+ oversizedCounter.incrementAndGet();
+ }
+
+ public Snapshot snapshot()
+ {
+ return new Snapshot(
+ okCounter.get(),
+ errorCounter.get(),
+ oversizedCounter.get()
+ );
+ }
+
+ public static class Snapshot
+ {
+ private final long numOk;
+ private final long numError;
+ private final long numOversized;
+
+ Snapshot(final long numOk, final long numError, final long numOversized)
+ {
+ this.numOk = numOk;
+ this.numError = numError;
+ this.numOversized = numOversized;
+ }
+
+ public long getNumOk()
+ {
+ return numOk;
+ }
+
+ public long getNumError()
+ {
+ return numError;
+ }
+
+ public long getNumOversized()
+ {
+ return numOversized;
+ }
+
+ public Snapshot delta(Snapshot oldSnapshot)
+ {
+ if (oldSnapshot == null) {
+ return this;
+ } else {
+ return new Snapshot(
+ numOk - oldSnapshot.numOk,
+ numError - oldSnapshot.numError,
+ numOversized - oldSnapshot.numOversized
+ );
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
new file mode 100644
index 0000000..7e7fd5e
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.SequenceWrapper;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+public class ForegroundCachePopulator implements CachePopulator
+{
+ private static final Logger log = new Logger(ForegroundCachePopulator.class);
+
+ private final Object lock = new Object();
+ private final ObjectMapper objectMapper;
+ private final CachePopulatorStats cachePopulatorStats;
+ private final long maxEntrySize;
+
+ public ForegroundCachePopulator(
+ final ObjectMapper objectMapper,
+ final CachePopulatorStats cachePopulatorStats,
+ final long maxEntrySize
+ )
+ {
+ this.objectMapper = Preconditions.checkNotNull(objectMapper,
"objectMapper");
+ this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats,
"cachePopulatorStats");
+ this.maxEntrySize = maxEntrySize;
+ }
+
+ @Override
+ public <T, CacheType> Sequence<T> wrap(
+ final Sequence<T> sequence,
+ final Function<T, CacheType> cacheFn,
+ final Cache cache,
+ final Cache.NamedKey cacheKey
+ )
+ {
+ final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ final AtomicBoolean tooBig = new AtomicBoolean(false);
+ final JsonGenerator jsonGenerator;
+
+ try {
+ jsonGenerator = objectMapper.getFactory().createGenerator(bytes);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return Sequences.wrap(
+ Sequences.map(
+ sequence,
+ input -> {
+ if (!tooBig.get()) {
+ synchronized (lock) {
+ try {
+ jsonGenerator.writeObject(cacheFn.apply(input));
+
+ // Not flushing jsonGenerator before checking this, but
should be ok since Jackson buffers are
+ // typically just a few KB, and we don't want to waste
cycles flushing.
+ if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+ tooBig.set(true);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return input;
+ }
+ ),
+ new SequenceWrapper()
+ {
+ @Override
+ public void after(final boolean isDone, final Throwable thrown)
throws Exception
+ {
+ synchronized (lock) {
+ jsonGenerator.close();
+
+ if (isDone) {
+ // Check tooBig, then check maxEntrySize one more time, after
closing/flushing jsonGenerator.
+ if (tooBig.get() || (maxEntrySize > 0 && bytes.size() >
maxEntrySize)) {
+ cachePopulatorStats.incrementOversized();
+ return;
+ }
+
+ try {
+ cache.put(cacheKey, bytes.toByteArray());
+ cachePopulatorStats.incrementOk();
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to write to cache");
+ cachePopulatorStats.incrementError();
+ }
+ }
+ }
+ }
+ }
+ );
+ }
+}
diff --git a/server/src/main/java/io/druid/client/cache/HybridCache.java
b/server/src/main/java/io/druid/client/cache/HybridCache.java
index f732bee..c25beff 100644
--- a/server/src/main/java/io/druid/client/cache/HybridCache.java
+++ b/server/src/main/java/io/druid/client/cache/HybridCache.java
@@ -21,8 +21,8 @@ package io.druid.client.cache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
import java.util.Collections;
diff --git a/server/src/main/java/io/druid/guice/CacheModule.java
b/server/src/main/java/io/druid/guice/CacheModule.java
index 19d6e57..488e52c 100644
--- a/server/src/main/java/io/druid/guice/CacheModule.java
+++ b/server/src/main/java/io/druid/guice/CacheModule.java
@@ -24,6 +24,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.CacheProvider;
import io.druid.guice.annotations.Global;
@@ -48,6 +49,7 @@ public class CacheModule implements Module
public void configure(Binder binder)
{
binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class,
Global.class)).in(ManageLifecycle.class);
+ binder.bind(CachePopulatorStats.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
binder.install(new HybridCacheModule(prefix));
diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java
b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
index c0c3287..ff9859d 100644
--- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
@@ -19,22 +19,26 @@
package io.druid.guice;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
+import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.common.utils.VMUtils;
-import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Processing;
+import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
import io.druid.java.util.common.lifecycle.Lifecycle;
@@ -64,14 +68,15 @@ public class DruidProcessingModule implements Module
}
@Provides
- @BackgroundCaching
@LazySingleton
- public ExecutorService getBackgroundExecutorService(
+ public CachePopulator getCachePopulator(
+ @Smile ObjectMapper smileMapper,
+ CachePopulatorStats cachePopulatorStats,
CacheConfig cacheConfig
)
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
- return Executors.newFixedThreadPool(
+ final ExecutorService exec = Executors.newFixedThreadPool(
cacheConfig.getNumBackgroundThreads(),
new ThreadFactoryBuilder()
.setNameFormat("background-cacher-%d")
@@ -79,8 +84,10 @@ public class DruidProcessingModule implements Module
.setPriority(Thread.MIN_PRIORITY)
.build()
);
+
+ return new BackgroundCachePopulator(exec, smileMapper,
cachePopulatorStats, cacheConfig.getMaxEntrySize());
} else {
- return MoreExecutors.sameThreadExecutor();
+ return new ForegroundCachePopulator(smileMapper, cachePopulatorStats,
cacheConfig.getMaxEntrySize());
}
}
diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java
b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
index 17d1326..4b2bbc1 100644
--- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
@@ -22,16 +22,14 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
-import io.druid.client.cache.CacheConfig;
import io.druid.collections.BlockingPool;
import io.druid.collections.DummyBlockingPool;
import io.druid.collections.DummyNonBlockingPool;
import io.druid.collections.NonBlockingPool;
-import io.druid.java.util.common.concurrent.Execs;
-import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Processing;
+import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidProcessingConfig;
@@ -59,20 +57,6 @@ public class RouterProcessingModule implements Module
}
@Provides
- @BackgroundCaching
- @LazySingleton
- public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig)
- {
- if (cacheConfig.getNumBackgroundThreads() > 0) {
- log.error(
- "numBackgroundThreads[%d] configured, that is ignored on Router",
- cacheConfig.getNumBackgroundThreads()
- );
- }
- return Execs.dummy();
- }
-
- @Provides
@Processing
@ManageLifecycle
public ExecutorService getProcessingExecutorService(DruidProcessingConfig
config)
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c0169e7..5b79626 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -40,6 +40,7 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
@@ -166,7 +167,8 @@ public class AppenderatorImpl implements Appenderator
IndexIO indexIO,
IndexMerger indexMerger,
Cache cache,
- CacheConfig cacheConfig
+ CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats
)
{
this.schema = Preconditions.checkNotNull(schema, "schema");
@@ -186,7 +188,8 @@ public class AppenderatorImpl implements Appenderator
conglomerate,
queryExecutorService,
Preconditions.checkNotNull(cache, "cache"),
- cacheConfig
+ cacheConfig,
+ cachePopulatorStats
);
maxBytesTuningConfig =
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
log.info("Created Appenderator for dataSource[%s].",
schema.getDataSource());
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
index 7651c40..1e22cdd 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -52,7 +53,8 @@ public class Appenderators
ServiceEmitter emitter,
ExecutorService queryExecutorService,
Cache cache,
- CacheConfig cacheConfig
+ CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats
)
{
return new AppenderatorImpl(
@@ -68,7 +70,8 @@ public class Appenderators
indexIO,
indexMerger,
cache,
- cacheConfig
+ cacheConfig,
+ cachePopulatorStats
);
}
@@ -120,6 +123,7 @@ public class Appenderators
indexIO,
indexMerger,
null,
+ null,
null
);
}
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index b6b188e..7d027d6 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -21,11 +21,11 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
@@ -51,6 +51,7 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
private final IndexMerger indexMerger;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
public DefaultRealtimeAppenderatorFactory(
@JacksonInject ServiceEmitter emitter,
@@ -62,7 +63,8 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
@JacksonInject IndexIO indexIO,
@JacksonInject IndexMerger indexMerger,
@JacksonInject Cache cache,
- @JacksonInject CacheConfig cacheConfig
+ @JacksonInject CacheConfig cacheConfig,
+ @JacksonInject CachePopulatorStats cachePopulatorStats
)
{
this.emitter = emitter;
@@ -75,6 +77,7 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
this.indexMerger = indexMerger;
this.cache = cache;
this.cacheConfig = cacheConfig;
+ this.cachePopulatorStats = cachePopulatorStats;
}
@Override
@@ -103,7 +106,8 @@ public class DefaultRealtimeAppenderatorFactory implements
AppenderatorFactory
emitter,
queryExecutorService,
cache,
- cacheConfig
+ cacheConfig,
+ cachePopulatorStats
);
}
diff --git
a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 6901ace..ef18be2 100644
---
a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -24,15 +24,17 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.FunctionalIterable;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
@@ -76,6 +78,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
private final ExecutorService queryExecutorService;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
public SinkQuerySegmentWalker(
String dataSource,
@@ -85,7 +88,8 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
Cache cache,
- CacheConfig cacheConfig
+ CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
this.queryExecutorService =
Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
+ this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats,
"cachePopulatorStats");
if (!cache.isLocal()) {
log.warn("Configured cache[%s] is not local, caching will not be
enabled.", cache.getClass().getName());
@@ -235,7 +240,12 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
cache,
toolChest,
baseRunner,
-
MoreExecutors.sameThreadExecutor(),
+ // Always populate
in foreground regardless of config
+ new
ForegroundCachePopulator(
+ objectMapper,
+
cachePopulatorStats,
+
cacheConfig.getMaxEntrySize()
+ ),
cacheConfig
);
} else {
diff --git
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
index aefbdf0..2bdf3a3 100644
---
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
+++
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
@@ -73,6 +74,7 @@ public class FlushingPlumber extends RealtimePlumber
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats,
ObjectMapper objectMapper
)
@@ -92,6 +94,7 @@ public class FlushingPlumber extends RealtimePlumber
indexIO,
cache,
cacheConfig,
+ cachePopulatorStats,
objectMapper
);
diff --git
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
index ea5e85b..244b2f0 100644
---
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
+++
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
@@ -24,10 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@@ -57,6 +58,7 @@ public class FlushingPlumberSchool extends
RealtimePlumberSchool
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
private final ObjectMapper objectMapper;
@JsonCreator
@@ -70,6 +72,7 @@ public class FlushingPlumberSchool extends
RealtimePlumberSchool
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
+ @JacksonInject CachePopulatorStats cachePopulatorStats,
@JacksonInject ObjectMapper objectMapper
)
{
@@ -85,6 +88,7 @@ public class FlushingPlumberSchool extends
RealtimePlumberSchool
indexIO,
cache,
cacheConfig,
+ cachePopulatorStats,
objectMapper
);
@@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends
RealtimePlumberSchool
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
+ this.cachePopulatorStats = cachePopulatorStats;
this.objectMapper = objectMapper;
}
@@ -122,6 +127,7 @@ public class FlushingPlumberSchool extends
RealtimePlumberSchool
indexIO,
cache,
cacheConfig,
+ cachePopulatorStats,
objectMapper
);
}
diff --git
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
index 1e7a5c8..bfef2c6 100644
---
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
+++
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
@@ -146,6 +147,7 @@ public class RealtimePlumber implements Plumber
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
+ CachePopulatorStats cachePopulatorStats,
ObjectMapper objectMapper
)
{
@@ -168,7 +170,8 @@ public class RealtimePlumber implements Plumber
conglomerate,
queryExecutorService,
cache,
- cacheConfig
+ cacheConfig,
+ cachePopulatorStats
);
log.info("Creating plumber using rejectionPolicy[%s]",
getRejectionPolicy());
diff --git
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index ec3ed40..6ad7aca 100644
---
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@@ -54,6 +55,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
+ private final CachePopulatorStats cachePopulatorStats;
private final ObjectMapper objectMapper;
@JsonCreator
@@ -69,6 +71,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
+ @JacksonInject CachePopulatorStats cachePopulatorStats,
@JacksonInject ObjectMapper objectMapper
)
{
@@ -84,6 +87,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.cache = cache;
this.cacheConfig = cacheConfig;
+ this.cachePopulatorStats = cachePopulatorStats;
this.objectMapper = objectMapper;
}
@@ -111,6 +115,7 @@ public class RealtimePlumberSchool implements PlumberSchool
indexIO,
cache,
cacheConfig,
+ cachePopulatorStats,
objectMapper
);
}
diff --git
a/server/src/main/java/io/druid/server/coordination/ServerManager.java
b/server/src/main/java/io/druid/server/coordination/ServerManager.java
index aac2e86..4a134d7 100644
--- a/server/src/main/java/io/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java
@@ -26,7 +26,7 @@ import com.google.inject.Inject;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
-import io.druid.guice.annotations.BackgroundCaching;
+import io.druid.client.cache.CachePopulator;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
@@ -77,7 +77,7 @@ public class ServerManager implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
- private final ExecutorService cachingExec;
+ private final CachePopulator cachePopulator;
private final Cache cache;
private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig;
@@ -89,7 +89,7 @@ public class ServerManager implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
- @BackgroundCaching ExecutorService cachingExec,
+ CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig,
@@ -101,7 +101,7 @@ public class ServerManager implements QuerySegmentWalker
this.emitter = emitter;
this.exec = exec;
- this.cachingExec = cachingExec;
+ this.cachePopulator = cachePopulator;
this.cache = cache;
this.objectMapper = objectMapper;
@@ -298,7 +298,7 @@ public class ServerManager implements QuerySegmentWalker
cache,
toolChest,
metricsEmittingQueryRunnerInner,
- cachingExec,
+ cachePopulator,
cacheConfig
);
diff --git
a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
index f50eea7..67ac01c 100644
---
a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
+++
b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -21,10 +21,11 @@ package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
@@ -74,7 +75,9 @@ public class CachingClusteredClientFunctionalityTest
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
- client = makeClient(MoreExecutors.sameThreadExecutor());
+ client = makeClient(
+ new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper,
new CachePopulatorStats(), -1)
+ );
}
@Test
@@ -199,13 +202,13 @@ public class CachingClusteredClientFunctionalityTest
));
}
- protected CachingClusteredClient makeClient(final ListeningExecutorService
backgroundExecutorService)
+ protected CachingClusteredClient makeClient(final CachePopulator
cachePopulator)
{
- return makeClient(backgroundExecutorService, cache, 10);
+ return makeClient(cachePopulator, cache, 10);
}
protected CachingClusteredClient makeClient(
- final ListeningExecutorService backgroundExecutorService,
+ final CachePopulator cachePopulator,
final Cache cache,
final int mergeLimit
)
@@ -245,7 +248,7 @@ public class CachingClusteredClientFunctionalityTest
},
cache,
CachingClusteredClientTest.jsonMapper,
- backgroundExecutorService,
+ cachePopulator,
new CacheConfig()
{
@Override
diff --git
a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 31f68ff..6143dad 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -43,8 +43,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
@@ -330,7 +334,7 @@ public class CachingClusteredClientTest
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
- client = makeClient(MoreExecutors.sameThreadExecutor());
+ client = makeClient(new ForegroundCachePopulator(jsonMapper, new
CachePopulatorStats(), -1));
servers = new DruidServer[]{
new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL,
"bye", 0),
@@ -422,7 +426,14 @@ public class CachingClusteredClientTest
}
};
- client = makeClient(randomizingExecutorService);
+ client = makeClient(
+ new BackgroundCachePopulator(
+ randomizingExecutorService,
+ jsonMapper,
+ new CachePopulatorStats(),
+ -1
+ )
+ );
// callback to be run every time a query run is complete, to ensure all
background
// caching tasks are executed, and cache is populated before we move onto
the next query
@@ -579,7 +590,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
- client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
+ client = makeClient(new ForegroundCachePopulator(jsonMapper, new
CachePopulatorStats(), -1), cache, limit);
final DruidServer lastServer = servers[random.nextInt(servers.length)];
final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
@@ -604,7 +615,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
- client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
+ client = makeClient(new ForegroundCachePopulator(jsonMapper, new
CachePopulatorStats(), -1), cache, 0);
getDefaultQueryRunner().run(QueryPlus.wrap(query), context);
EasyMock.verify(cache);
EasyMock.verify(dataSegment);
@@ -2630,13 +2641,13 @@ public class CachingClusteredClientTest
EasyMock.reset(mocks);
}
- protected CachingClusteredClient makeClient(final ListeningExecutorService
backgroundExecutorService)
+ protected CachingClusteredClient makeClient(final CachePopulator
cachePopulator)
{
- return makeClient(backgroundExecutorService, cache, 10);
+ return makeClient(cachePopulator, cache, 10);
}
protected CachingClusteredClient makeClient(
- final ListeningExecutorService backgroundExecutorService,
+ final CachePopulator cachePopulator,
final Cache cache,
final int mergeLimit
)
@@ -2676,7 +2687,7 @@ public class CachingClusteredClientTest
},
cache,
jsonMapper,
- backgroundExecutorService,
+ cachePopulator,
new CacheConfig()
{
@Override
diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
index ab2bf22..012fe2b 100644
--- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
@@ -19,20 +19,26 @@
package io.druid.client;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.CacheStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
@@ -63,15 +69,15 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.ByteArrayOutputStream;
import java.io.Closeable;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,14 +104,22 @@ public class CachingQueryRunnerTest
DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};
- private ExecutorService backgroundExecutorService;
+ private ObjectMapper objectMapper;
+ private CachePopulator cachePopulator;
public CachingQueryRunnerTest(int numBackgroundThreads)
{
+ objectMapper = new DefaultObjectMapper();
+
if (numBackgroundThreads > 0) {
- backgroundExecutorService =
Executors.newFixedThreadPool(numBackgroundThreads);
+ cachePopulator = new BackgroundCachePopulator(
+ Execs.multiThreaded(numBackgroundThreads,
"CachingQueryRunnerTest-%d"),
+ objectMapper,
+ new CachePopulatorStats(),
+ -1
+ );
} else {
- backgroundExecutorService = MoreExecutors.sameThreadExecutor();
+ cachePopulator = new ForegroundCachePopulator(objectMapper, new
CachePopulatorStats(), -1);
}
}
@@ -274,7 +288,7 @@ public class CachingQueryRunnerTest
return resultSeq;
}
},
- backgroundExecutorService,
+ cachePopulator,
new CacheConfig()
{
@Override
@@ -331,7 +345,7 @@ public class CachingQueryRunnerTest
List<Result> expectedResults,
Query query,
QueryToolChest toolchest
- )
+ ) throws IOException
{
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
String segmentIdentifier = "segment";
@@ -345,12 +359,7 @@ public class CachingQueryRunnerTest
);
Cache cache = MapCache.create(1024 * 1024);
- CacheUtil.populate(
- cache,
- objectMapper,
- cacheKey,
- Iterables.transform(expectedResults,
cacheStrategy.prepareForSegmentLevelCache())
- );
+ cache.put(cacheKey, toByteArray(Iterables.transform(expectedResults,
cacheStrategy.prepareForSegmentLevelCache())));
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
@@ -367,7 +376,7 @@ public class CachingQueryRunnerTest
return Sequences.empty();
}
},
- backgroundExecutorService,
+ cachePopulator,
new CacheConfig()
{
@Override
@@ -434,6 +443,19 @@ public class CachingQueryRunnerTest
return retVal;
}
+ private <T> byte[] toByteArray(final Iterable<T> results) throws IOException
+ {
+ final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+ try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes))
{
+ for (T result : results) {
+ gen.writeObject(result);
+ }
+ }
+
+ return bytes.toByteArray();
+ }
+
private static class AssertingClosable implements Closeable
{
diff --git a/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
new file mode 100644
index 0000000..b855e20
--- /dev/null
+++ b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class CachePopulatorTest
+{
+ private final ExecutorService exec = Execs.multiThreaded(2,
"cache-populator-test-%d");
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final Cache cache = new MapCache(new
ByteCountingLRUMap(Long.MAX_VALUE));
+ private final CachePopulatorStats stats = new CachePopulatorStats();
+
+ @After
+ public void tearDown()
+ {
+ exec.shutdownNow();
+ }
+
+ @Test
+ public void testForegroundPopulator()
+ {
+ final CachePopulator populator = new
ForegroundCachePopulator(objectMapper, stats, -1);
+ final List<String> strings = ImmutableList.of("foo", "bar");
+
+ Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1),
strings));
+ Assert.assertEquals(strings, readFromCache(makeKey(1)));
+ Assert.assertEquals(1, stats.snapshot().getNumOk());
+ Assert.assertEquals(0, stats.snapshot().getNumError());
+ Assert.assertEquals(0, stats.snapshot().getNumOversized());
+ }
+
+ @Test
+ public void testForegroundPopulatorMaxEntrySize()
+ {
+ final CachePopulator populator = new
ForegroundCachePopulator(objectMapper, stats, 30);
+ final List<String> strings = ImmutableList.of("foo", "bar");
+ final List<String> strings2 = ImmutableList.of("foo",
"baralararararararaarararararaa");
+
+ Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1),
strings));
+ Assert.assertEquals(strings, readFromCache(makeKey(1)));
+ Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2),
strings2));
+ Assert.assertNull(readFromCache(makeKey(2)));
+
+ Assert.assertEquals(1, stats.snapshot().getNumOk());
+ Assert.assertEquals(0, stats.snapshot().getNumError());
+ Assert.assertEquals(1, stats.snapshot().getNumOversized());
+ }
+
+ @Test(timeout = 60000L)
+ public void testBackgroundPopulator() throws InterruptedException
+ {
+ final CachePopulator populator = new BackgroundCachePopulator(exec,
objectMapper, stats, -1);
+ final List<String> strings = ImmutableList.of("foo", "bar");
+
+ Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1),
strings));
+
+ // Wait for background updates to happen.
+ while (cache.getStats().getNumEntries() < 1) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertEquals(strings, readFromCache(makeKey(1)));
+ Assert.assertEquals(1, stats.snapshot().getNumOk());
+ Assert.assertEquals(0, stats.snapshot().getNumError());
+ Assert.assertEquals(0, stats.snapshot().getNumOversized());
+ }
+
+ @Test(timeout = 60000L)
+ public void testBackgroundPopulatorMaxEntrySize() throws InterruptedException
+ {
+ final CachePopulator populator = new BackgroundCachePopulator(exec,
objectMapper, stats, 30);
+ final List<String> strings = ImmutableList.of("foo", "bar");
+ final List<String> strings2 = ImmutableList.of("foo",
"baralararararararaarararararaa");
+
+ Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1),
strings));
+ Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2),
strings2));
+
+ // Wait for background updates to happen.
+ while (cache.getStats().getNumEntries() < 1 ||
stats.snapshot().getNumOversized() < 1) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertEquals(strings, readFromCache(makeKey(1)));
+ Assert.assertNull(readFromCache(makeKey(2)));
+ Assert.assertEquals(1, stats.snapshot().getNumOk());
+ Assert.assertEquals(0, stats.snapshot().getNumError());
+ Assert.assertEquals(1, stats.snapshot().getNumOversized());
+ }
+
+ private static Cache.NamedKey makeKey(final int n)
+ {
+ return new Cache.NamedKey("test", Ints.toByteArray(n));
+ }
+
+ private List<String> wrapAndReturn(
+ final CachePopulator populator,
+ final Cache.NamedKey key,
+ final List<String> strings
+ )
+ {
+ return populator.wrap(Sequences.simple(strings), s -> ImmutableMap.of("s",
s), cache, key).toList();
+ }
+
+ private List<String> readFromCache(final Cache.NamedKey key)
+ {
+ final byte[] bytes = cache.get(key);
+ if (bytes == null) {
+ return null;
+ }
+
+ try (
+ final MappingIterator<Map<String, String>> iterator =
objectMapper.readValues(
+ objectMapper.getFactory().createParser(bytes),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
+ )
+ ) {
+ final List<Map<String, String>> retVal = new ArrayList<>();
+ Iterators.addAll(retVal, iterator);
+
+ // Undo map-wrapping that was done in wrapAndReturn.
+ return retVal.stream().map(m -> m.get("s")).collect(Collectors.toList());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
index 7b503aa..f88f5bf 100644
--- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@@ -115,6 +116,7 @@ public class FireDepartmentTest
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
MapCache.create(0),
NO_CACHE_CONFIG,
+ new CachePopulatorStats(),
TestHelper.makeJsonMapper()
),
diff --git
a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
index 3c675c6..c5522d9 100644
---
a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
+++
b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@@ -273,7 +274,8 @@ public class AppenderatorTester implements AutoCloseable
emitter,
queryExecutor,
MapCache.create(2048),
- new CacheConfig()
+ new CacheConfig(),
+ new CachePopulatorStats()
);
}
diff --git
a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index a5ad8a4..ef96417 100644
---
a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++
b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
@@ -223,6 +224,7 @@ public class RealtimePlumberSchoolTest
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
+ new CachePopulatorStats(),
TestHelper.makeJsonMapper()
);
diff --git
a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
index 1287748..df4aebf 100644
--- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
@@ -26,8 +26,9 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
@@ -153,7 +154,7 @@ public class ServerManagerTest
},
new NoopServiceEmitter(),
serverManagerExec,
- MoreExecutors.sameThreadExecutor(),
+ new ForegroundCachePopulator(new DefaultObjectMapper(), new
CachePopulatorStats(), -1),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
new CacheConfig(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]