This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3525d40  Cache: Add maxEntrySize config, make groupBy cacheable by 
default. (#5108)
3525d40 is described below

commit 3525d4059e230b4995b658282fd13f41e8125af5
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 7 10:23:15 2018 -0700

    Cache: Add maxEntrySize config, make groupBy cacheable by default. (#5108)
    
    * Cache: Add maxEntrySize config.
    
    The idea is this makes it more feasible to cache query types that
    can potentially generate large result sets, like groupBy and select,
    without fear of writing too much to the cache per query.
    
    Includes a refactor of cache population code in CachingQueryRunner and
    CachingClusteredClient, such that they now use the same CachePopulator
    interface with two implementations: one for foreground and one for
    background.
    
    The main reason for splitting the foreground / background impls is
    that the foreground impl can have a more effective implementation of
    maxEntrySize. It can stop retaining subvalues for the cache early.
    
    * Add CachePopulatorStats.
    
    * Fix whitespace.
    
    * Fix docs.
    
    * Fix various tests.
    
    * Add tests.
    
    * Fix tests.
    
    * Better tests
    
    * Remove conflict markers.
    
    * Fix licenses.
---
 docs/content/configuration/broker.md               |   3 +-
 docs/content/configuration/historical.md           |   3 +-
 docs/content/configuration/index.md                |  23 ++-
 docs/content/configuration/realtime.md             |   3 +-
 docs/content/operations/metrics.md                 |   3 +
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |   3 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   2 +
 .../java/io/druid/indexing/common/TaskToolbox.java |   9 ++
 .../druid/indexing/common/TaskToolboxFactory.java  |   9 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   3 +-
 .../indexing/common/task/RealtimeIndexTask.java    |   1 +
 .../io/druid/indexing/common/TaskToolboxTest.java  |   2 +
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   2 +
 .../indexing/common/task/CompactionTaskTest.java   |   1 +
 .../druid/indexing/common/task/IndexTaskTest.java  |   1 +
 .../common/task/RealtimeIndexTaskTest.java         |   2 +
 .../common/task/SameIntervalMergeTaskTest.java     |   1 +
 .../firehose/IngestSegmentFirehoseFactoryTest.java |   1 +
 .../IngestSegmentFirehoseFactoryTimelineTest.java  |   1 +
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +
 .../indexing/worker/WorkerTaskManagerTest.java     |   1 +
 .../indexing/worker/WorkerTaskMonitorTest.java     |   1 +
 .../src/main/java/io/druid/client/CacheUtil.java   |  22 ---
 .../io/druid/client/CachingClusteredClient.java    | 100 +++----------
 .../java/io/druid/client/CachingQueryRunner.java   |  66 +--------
 .../client/cache/BackgroundCachePopulator.java     | 141 ++++++++++++++++++
 .../java/io/druid/client/cache/CacheConfig.java    |  14 +-
 .../java/io/druid/client/cache/CacheMonitor.java   |  40 +++--
 .../java/io/druid/client/cache/CachePopulator.java |  23 ++-
 .../io/druid/client/cache/CachePopulatorStats.java |  97 ++++++++++++
 .../client/cache/ForegroundCachePopulator.java     | 127 ++++++++++++++++
 .../java/io/druid/client/cache/HybridCache.java    |   2 +-
 .../src/main/java/io/druid/guice/CacheModule.java  |   2 +
 .../java/io/druid/guice/DruidProcessingModule.java |  19 ++-
 .../io/druid/guice/RouterProcessingModule.java     |  18 +--
 .../realtime/appenderator/AppenderatorImpl.java    |   7 +-
 .../realtime/appenderator/Appenderators.java       |   8 +-
 .../DefaultRealtimeAppenderatorFactory.java        |  12 +-
 .../appenderator/SinkQuerySegmentWalker.java       |  18 ++-
 .../segment/realtime/plumber/FlushingPlumber.java  |   3 +
 .../realtime/plumber/FlushingPlumberSchool.java    |   8 +-
 .../segment/realtime/plumber/RealtimePlumber.java  |   5 +-
 .../realtime/plumber/RealtimePlumberSchool.java    |   7 +-
 .../druid/server/coordination/ServerManager.java   |  10 +-
 .../CachingClusteredClientFunctionalityTest.java   |  17 ++-
 .../druid/client/CachingClusteredClientTest.java   |  27 +++-
 .../io/druid/client/CachingQueryRunnerTest.java    |  52 +++++--
 .../io/druid/client/cache/CachePopulatorTest.java  | 163 +++++++++++++++++++++
 .../druid/segment/realtime/FireDepartmentTest.java |   2 +
 .../realtime/appenderator/AppenderatorTester.java  |   4 +-
 .../plumber/RealtimePlumberSchoolTest.java         |   2 +
 .../server/coordination/ServerManagerTest.java     |   5 +-
 53 files changed, 826 insertions(+), 273 deletions(-)

diff --git a/docs/content/configuration/broker.md 
b/docs/content/configuration/broker.md
index 2d442bf..3695396 100644
--- a/docs/content/configuration/broker.md
+++ b/docs/content/configuration/broker.md
@@ -113,8 +113,9 @@ You can optionally only configure caching to be enabled on 
the broker by setting
 |`druid.broker.cache.useResultLevelCache`|true, false|Enable result level 
caching on the broker.|false|
 |`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result 
level cache on the broker.|false|
 |`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of 
query response that can be cached.|`Integer.MAX_VALUE`|
-|`druid.broker.cache.unCacheable`|All druid query types|All query types to not 
cache.|`["groupBy", "select"]`|
+|`druid.broker.cache.unCacheable`|All druid query types|All query types to not 
cache.|`["select"]`|
 |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with 
more segments than this number will not attempt to fetch from cache at the 
broker level, leaving potential caching fetches (and cache result merging) to 
the historicals|`Integer.MAX_VALUE`|
+|`druid.broker.cache.maxEntrySize`|positive integer or -1|Maximum size of an 
individual cache entry (processed results for one segment), in bytes, or -1 for 
unlimited.|`1000000` (1MB)|
 
 See [cache configuration](caching.html) for how to configure cache settings.
 
diff --git a/docs/content/configuration/historical.md 
b/docs/content/configuration/historical.md
index 02dddb9..ae770ea 100644
--- a/docs/content/configuration/historical.md
+++ b/docs/content/configuration/historical.md
@@ -97,7 +97,8 @@ You can optionally only configure caching to be enabled on 
the historical by set
 |--------|---------------|-----------|-------|
 |`druid.historical.cache.useCache`|true, false|Enable the cache on the 
historical.|false|
 |`druid.historical.cache.populateCache`|true, false|Populate the cache on the 
historical.|false|
-|`druid.historical.cache.unCacheable`|All druid query types|All query types to 
not cache.|["groupBy", "select"]|
+|`druid.historical.cache.unCacheable`|All druid query types|All query types to 
not cache.|["select"]|
+|`druid.historical.cache.maxEntrySize`|positive integer or -1|Maximum size of 
an individual cache entry (processed results for one segment), in bytes, or -1 
for unlimited.|`1000000` (1MB)|
 
 
 See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index d4ca2dc..3d9f262 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -353,16 +353,21 @@ You can enable caching of results at the broker, 
historical, or realtime level u
 
|<code>druid.(broker&#124;historical&#124;realtime).cache.unCacheable</code>|All
 druid query types|All query types to not cache.|["groupBy", "select"]|
 
|<code>druid.(broker&#124;historical&#124;realtime).cache.useCache</code>|true, 
false|Whether to use cache for getting query results.|false|
 
|<code>druid.(broker&#124;historical&#124;realtime).cache.populateCache</code>|true,
 false|Whether to populate cache.|false|
+|<code>druid.(broker&#124;historical&#124;realtime).cache.maxEntrySize</code>|positive
 integer or -1|Maximum size of an individual cache entry (processed results for 
one segment), in bytes, or -1 for unlimited.|`-1`|
 
 #### Local Cache
 
+<div class="note caution">
+DEPRECATED: Use caffeine instead
+</div>
+
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.cache.sizeInBytes`|Maximum cache size in bytes. You must set this if 
you enabled populateCache/useCache, or else cache size of zero wouldn't really 
cache anything.|0|
 |`druid.cache.initialSize`|Initial size of the hashtable backing the 
cache.|500000|
 |`druid.cache.logEvictionCount`|If non-zero, log cache eviction every 
`logEvictionCount` items.|0|
 
-#### Memcache
+#### Memcached
 
 |Property|Description|Default|
 |--------|-----------|-------|
@@ -372,6 +377,22 @@ You can enable caching of results at the broker, 
historical, or realtime level u
 |`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached 
object.|52428800 (50 MB)|
 |`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
 
+#### Caffeine Cache
+
+A highly performant local cache implementation for Druid based on 
[Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher 
if using `COMMON_FJP`.
+
+Below are the configuration options known to this module:
+
+|`runtime.properties`|Description|Default|
+|--------------------|-----------|-------|
+|`druid.cache.type`| Set this to `caffeine`|`local`|
+|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on 
heap.|None (unlimited)|
+|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache 
entry may be expired|None (no time limit)|
+|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine 
maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or 
`SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)|
+|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment 
from a node) should cause an eager eviction of associated cache values|`false`|
+
+See the [Caching documentation](caching.html) for more detail.
+
 ### Indexing Service Discovery
 
 This config is used to find the [Indexing 
Service](../design/indexing-service.html) using Curator service discovery. Only 
required if you are actually running an indexing service.
diff --git a/docs/content/configuration/realtime.md 
b/docs/content/configuration/realtime.md
index 80fe955..b75fca5 100644
--- a/docs/content/configuration/realtime.md
+++ b/docs/content/configuration/realtime.md
@@ -72,6 +72,7 @@ You can optionally configure caching to be enabled on the 
realtime node by setti
 |--------|---------------|-----------|-------|
 |`druid.realtime.cache.useCache`|true, false|Enable the cache on the 
realtime.|false|
 |`druid.realtime.cache.populateCache`|true, false|Populate the cache on the 
realtime.|false|
-|`druid.realtime.cache.unCacheable`|All druid query types|All query types to 
not cache.|`["groupBy", "select"]`|
+|`druid.realtime.cache.unCacheable`|All druid query types|All query types to 
not cache.|`["select"]`|
+|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an 
individual cache entry (processed results for one segment), in bytes, or -1 for 
unlimited.|`1000000` (1MB)|
 
 See [cache configuration](caching.html) for how to configure cache settings.
diff --git a/docs/content/operations/metrics.md 
b/docs/content/operations/metrics.md
index d2c802d..6dc3e0f 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -88,6 +88,9 @@ Available Metrics
 |`*/averageByte`|Average cache entry byte size.||Varies.|
 |`*/timeouts`|Number of cache timeouts.||0|
 |`*/errors`|Number of cache errors.||0|
+|`*/put/ok`|Number of new cache entries successfully cached.||Varies, but more 
than zero.|
+|`*/put/error`|Number of new cache entries that could not be cached due to 
errors.||Varies, but more than zero.|
+|`*/put/oversized`|Number of potential new cache entries that were skipped due 
to being too large (based on 
`druid.{broker,historical,realtime}.cache.maxEntrySize` properties).||Varies.|
 
 #### Memcached only metrics
 
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index a9d5c98..8cb6323 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -247,7 +247,8 @@ public class KafkaIndexTask extends AbstractTask implements 
ChatHandler
         toolbox.getEmitter(),
         toolbox.getQueryExecutorService(),
         toolbox.getCache(),
-        toolbox.getCacheConfig()
+        toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats()
     );
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1e30a95..1929bb4 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.FloatDimensionSchema;
@@ -2147,6 +2148,7 @@ public class KafkaIndexTaskTest
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java 
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
index d63c2a1..e31969e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.inject.Provider;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.Cache;
@@ -89,6 +90,7 @@ public class TaskToolbox
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final IndexMergerV9 indexMergerV9;
   private final TaskReportFileWriter taskReportFileWriter;
 
@@ -117,6 +119,7 @@ public class TaskToolbox
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       IndexMergerV9 indexMergerV9,
       DruidNodeAnnouncer druidNodeAnnouncer,
       DruidNode druidNode,
@@ -144,6 +147,7 @@ public class TaskToolbox
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null 
IndexMergerV9");
     this.druidNodeAnnouncer = druidNodeAnnouncer;
     this.druidNode = druidNode;
@@ -268,6 +272,11 @@ public class TaskToolbox
     return cacheConfig;
   }
 
+  public CachePopulatorStats getCachePopulatorStats()
+  {
+    return cachePopulatorStats;
+  }
+
   public IndexMergerV9 getIndexMergerV9()
   {
     return indexMergerV9;
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
index 95f1b32..f858761 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java
@@ -23,10 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.discovery.DataNodeService;
 import io.druid.discovery.DruidNodeAnnouncer;
 import io.druid.discovery.LookupNodeService;
@@ -35,6 +34,8 @@ import io.druid.guice.annotations.RemoteChatHandler;
 import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskConfig;
 import io.druid.indexing.common.task.Task;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -73,6 +74,7 @@ public class TaskToolboxFactory
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final IndexMergerV9 indexMergerV9;
   private final DruidNodeAnnouncer druidNodeAnnouncer;
   private final DruidNode druidNode;
@@ -100,6 +102,7 @@ public class TaskToolboxFactory
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       IndexMergerV9 indexMergerV9,
       DruidNodeAnnouncer druidNodeAnnouncer,
       @RemoteChatHandler DruidNode druidNode,
@@ -126,6 +129,7 @@ public class TaskToolboxFactory
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.indexMergerV9 = indexMergerV9;
     this.druidNodeAnnouncer = druidNodeAnnouncer;
     this.druidNode = druidNode;
@@ -157,6 +161,7 @@ public class TaskToolboxFactory
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         indexMergerV9,
         druidNodeAnnouncer,
         druidNode,
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 6c367fd..c1ee50b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -706,7 +706,8 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
         toolbox.getEmitter(),
         toolbox.getQueryExecutorService(),
         toolbox.getCache(),
-        toolbox.getCacheConfig()
+        toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats()
     );
   }
 
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 37a7ccf..a496182 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
         toolbox.getIndexIO(),
         toolbox.getCache(),
         toolbox.getCacheConfig(),
+        toolbox.getCachePopulatorStats(),
         toolbox.getObjectMapper()
     );
 
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java 
b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
index d557e8c..abd088f 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskConfig;
 import io.druid.indexing.common.task.NoopTestTaskFileWriter;
@@ -111,6 +112,7 @@ public class TaskToolboxTest
         mockIndexIO,
         mockCache,
         mockCacheConfig,
+        new CachePopulatorStats(),
         mockIndexMergerV9,
         null,
         null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 9f4073b..9166265 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.common.config.NullHandling;
 import io.druid.data.input.Firehose;
@@ -1500,6 +1501,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 99adee0..2288393 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -586,6 +586,7 @@ public class CompactionTaskTest
           indexIO,
           null,
           null,
+          null,
           new IndexMergerV9(objectMapper, indexIO, 
OffHeapMemorySegmentWriteOutMediumFactory.instance()),
           null,
           null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
index c05d2dc..b15261c 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
@@ -1514,6 +1514,7 @@ public class IndexTaskTest
         indexIO,
         null,
         null,
+        null,
         indexMergerV9,
         null,
         null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
index d3edfef..36fe3a3 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.common.config.NullHandling;
 import io.druid.data.input.Firehose;
@@ -1080,6 +1081,7 @@ public class RealtimeIndexTaskTest
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
         new CacheConfig(),
+        new CachePopulatorStats(),
         testUtils.getTestIndexMergerV9(),
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
index 2e816f5..900f48e 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java
@@ -252,6 +252,7 @@ public class SameIntervalMergeTaskTest
             indexIO,
             null,
             null,
+            null,
             EasyMock.createMock(IndexMergerV9.class),
             null,
             null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index f2085ea..60ef693 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -316,6 +316,7 @@ public class IngestSegmentFirehoseFactoryTest
         INDEX_IO,
         null,
         null,
+        null,
         INDEX_MERGER_V9,
         null,
         null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index c8149e3..469d4a8 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -340,6 +340,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
           INDEX_IO,
           null,
           null,
+          null,
           INDEX_MERGER_V9,
           null,
           null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 9a95b87..a5cd125 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -99,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest
         utils.getTestIndexIO(),
         null,
         null,
+        null,
         utils.getTestIndexMergerV9(),
         null,
         node,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index 615fe39..dc41b86 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.FirehoseFactory;
@@ -616,6 +617,7 @@ public class TaskLifecycleTest
         INDEX_IO,
         MapCache.create(0),
         FireDepartmentTest.NO_CACHE_CONFIG,
+        new CachePopulatorStats(),
         INDEX_MERGER_V9,
         EasyMock.createNiceMock(DruidNodeAnnouncer.class),
         EasyMock.createNiceMock(DruidNode.class),
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
index cd7e85e..ff528ea 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -125,6 +125,7 @@ public class WorkerTaskManagerTest
                 indexIO,
                 null,
                 null,
+                null,
                 indexMergerV9,
                 null,
                 null,
diff --git 
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
 
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
index 2b5374b..1af92ad 100644
--- 
a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -187,6 +187,7 @@ public class WorkerTaskMonitorTest
                 indexIO,
                 null,
                 null,
+                null,
                 indexMergerV9,
                 null,
                 null,
diff --git a/server/src/main/java/io/druid/client/CacheUtil.java 
b/server/src/main/java/io/druid/client/CacheUtil.java
index 78ba5e8..622ec0c 100644
--- a/server/src/main/java/io/druid/client/CacheUtil.java
+++ b/server/src/main/java/io/druid/client/CacheUtil.java
@@ -19,9 +19,6 @@
 
 package io.druid.client;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
 import io.druid.java.util.common.StringUtils;
@@ -31,8 +28,6 @@ import io.druid.query.QueryContexts;
 import io.druid.query.SegmentDescriptor;
 import org.joda.time.Interval;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class CacheUtil
@@ -57,23 +52,6 @@ public class CacheUtil
     );
   }
 
-  public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey 
key, Iterable<Object> results)
-  {
-    try {
-      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-      try (JsonGenerator gen = mapper.getFactory().createGenerator(bytes)) {
-        for (Object result : results) {
-          gen.writeObject(result);
-        }
-      }
-
-      cache.put(key, bytes.toByteArray());
-    }
-    catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
   public static <T> boolean useCacheOnBrokers(
       Query<T> query,
       CacheStrategy<T, Object, Query<T>> strategy,
diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java 
b/server/src/main/java/io/druid/client/CachingClusteredClient.java
index 50f6389..d227dd4 100644
--- a/server/src/main/java/io/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java
@@ -32,17 +32,12 @@ import com.google.common.collect.RangeSet;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
 import io.druid.client.selector.QueryableDruidServer;
 import io.druid.client.selector.ServerSelector;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.Pair;
@@ -88,8 +83,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -102,8 +95,8 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
   private final TimelineServerView serverView;
   private final Cache cache;
   private final ObjectMapper objectMapper;
+  private final CachePopulator cachePopulator;
   private final CacheConfig cacheConfig;
-  private final ListeningExecutorService backgroundExecutorService;
 
   @Inject
   public CachingClusteredClient(
@@ -111,7 +104,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       TimelineServerView serverView,
       Cache cache,
       @Smile ObjectMapper objectMapper,
-      @BackgroundCaching ExecutorService backgroundExecutorService,
+      CachePopulator cachePopulator,
       CacheConfig cacheConfig
   )
   {
@@ -119,13 +112,13 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
     this.serverView = serverView;
     this.cache = cache;
     this.objectMapper = objectMapper;
+    this.cachePopulator = cachePopulator;
     this.cacheConfig = cacheConfig;
-    this.backgroundExecutorService = 
MoreExecutors.listeningDecorator(backgroundExecutorService);
 
-    if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
+    if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && 
(cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
       log.warn(
-          "Even though groupBy caching is enabled, v2 groupBys will not be 
cached. "
-          + "Consider disabling cache on your broker and enabling it on your 
data nodes to enable v2 groupBy caching."
+          "Even though groupBy caching is enabled in your configuration, v2 
groupBys will not be cached on the broker. "
+          + "Consider enabling caching on your data nodes if it is not already 
enabled."
       );
     }
 
@@ -218,7 +211,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
     private final boolean isBySegment;
     private final int uncoveredIntervalsLimit;
     private final Query<T> downstreamQuery;
-    private final Map<String, CachePopulator> cachePopulatorMap = 
Maps.newHashMap();
+    private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = 
Maps.newHashMap();
 
     SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, 
Object> responseContext)
     {
@@ -420,7 +413,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
         } else if (populateCache) {
           // otherwise, if populating cache, add segment to list of segments 
to cache
           final String segmentIdentifier = 
segment.getServer().getSegment().getIdentifier();
-          addCachePopulator(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
+          addCachePopulatorKey(segmentCacheKey, segmentIdentifier, 
segmentQueryInterval);
         }
       });
       return alreadyCachedResults;
@@ -453,22 +446,22 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       }
     }
 
-    private void addCachePopulator(
+    private void addCachePopulatorKey(
         Cache.NamedKey segmentCacheKey,
         String segmentIdentifier,
         Interval segmentQueryInterval
     )
     {
-      cachePopulatorMap.put(
+      cachePopulatorKeyMap.put(
           StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
-          new CachePopulator(cache, objectMapper, segmentCacheKey)
+          segmentCacheKey
       );
     }
 
     @Nullable
-    private CachePopulator getCachePopulator(String segmentId, Interval 
segmentInterval)
+    private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval 
segmentInterval)
     {
-      return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, 
segmentInterval));
+      return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, 
segmentInterval));
     }
 
     private SortedMap<DruidServer, List<SegmentDescriptor>> 
groupSegmentsByServer(Set<ServerToSegment> segments)
@@ -601,27 +594,19 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
           responseContext
       );
       final Function<T, Object> cacheFn = 
strategy.prepareForSegmentLevelCache();
+
       return resultsBySegments
           .map(result -> {
             final BySegmentResultValueClass<T> resultsOfSegment = 
result.getValue();
-            final CachePopulator cachePopulator =
-                getCachePopulator(resultsOfSegment.getSegmentId(), 
resultsOfSegment.getInterval());
-            Sequence<T> res = Sequences
-                .simple(resultsOfSegment.getResults())
-                .map(r -> {
-                  if (cachePopulator != null) {
-                    // only compute cache data if populating cache
-                    
cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() -> 
cacheFn.apply(r)));
-                  }
-                  return r;
-                })
-                .map(
-                    toolChest.makePreComputeManipulatorFn(downstreamQuery, 
MetricManipulatorFns.deserializing())::apply
-                );
-            if (cachePopulator != null) {
-              res = res.withEffect(cachePopulator::populate, 
MoreExecutors.sameThreadExecutor());
+            final Cache.NamedKey cachePopulatorKey =
+                getCachePopulatorKey(resultsOfSegment.getSegmentId(), 
resultsOfSegment.getInterval());
+            Sequence<T> res = Sequences.simple(resultsOfSegment.getResults());
+            if (cachePopulatorKey != null) {
+              res = cachePopulator.wrap(res, cacheFn::apply, cache, 
cachePopulatorKey);
             }
-            return res;
+            return res.map(
+                toolChest.makePreComputeManipulatorFn(downstreamQuery, 
MetricManipulatorFns.deserializing())::apply
+            );
           })
           .flatMerge(seq -> seq, query.getResultOrdering());
     }
@@ -644,43 +629,4 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       return rhs;
     }
   }
-
-  private class CachePopulator
-  {
-    private final Cache cache;
-    private final ObjectMapper mapper;
-    private final Cache.NamedKey key;
-    private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures 
= new ConcurrentLinkedQueue<>();
-
-    CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
-    {
-      this.cache = cache;
-      this.mapper = mapper;
-      this.key = key;
-    }
-
-    public void populate()
-    {
-      Futures.addCallback(
-          Futures.allAsList(cacheFutures),
-          new FutureCallback<List<Object>>()
-          {
-            @Override
-            public void onSuccess(List<Object> cacheData)
-            {
-              CacheUtil.populate(cache, mapper, key, cacheData);
-              // Help out GC by making sure all references are gone
-              cacheFutures.clear();
-            }
-
-            @Override
-            public void onFailure(Throwable throwable)
-            {
-              log.error(throwable, "Background caching failed");
-            }
-          },
-          backgroundExecutorService
-      );
-    }
-  }
 }
diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java 
b/server/src/main/java/io/druid/client/CachingQueryRunner.java
index df9e885..422ea65 100644
--- a/server/src/main/java/io/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java
@@ -23,18 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
 import io.druid.java.util.common.guava.BaseSequence;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.guava.Sequences;
-import io.druid.java.util.common.logger.Logger;
 import io.druid.query.CacheStrategy;
 import io.druid.query.Query;
 import io.druid.query.QueryPlus;
@@ -46,20 +41,19 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 public class CachingQueryRunner<T> implements QueryRunner<T>
 {
-  private static final Logger log = new Logger(CachingQueryRunner.class);
   private final String segmentIdentifier;
   private final SegmentDescriptor segmentDescriptor;
   private final QueryRunner<T> base;
   private final QueryToolChest toolChest;
   private final Cache cache;
   private final ObjectMapper mapper;
+  private final CachePopulator cachePopulator;
   private final CacheConfig cacheConfig;
-  private final ListeningExecutorService backgroundExecutorService;
 
   public CachingQueryRunner(
       String segmentIdentifier,
@@ -68,7 +62,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
       Cache cache,
       QueryToolChest toolchest,
       QueryRunner<T> base,
-      ExecutorService backgroundExecutorService,
+      CachePopulator cachePopulator,
       CacheConfig cacheConfig
   )
   {
@@ -78,7 +72,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
     this.toolChest = toolchest;
     this.cache = cache;
     this.mapper = mapper;
-    this.backgroundExecutorService = 
MoreExecutors.listeningDecorator(backgroundExecutorService);
+    this.cachePopulator = cachePopulator;
     this.cacheConfig = cacheConfig;
   }
 
@@ -140,56 +134,10 @@ public class CachingQueryRunner<T> implements 
QueryRunner<T>
       }
     }
 
-    final Collection<ListenableFuture<?>> cacheFutures = 
Collections.synchronizedList(Lists.newLinkedList());
+    final Collection<ListenableFuture<?>> cacheFutures = 
Collections.synchronizedList(new LinkedList<>());
     if (populateCache) {
       final Function cacheFn = strategy.prepareForSegmentLevelCache();
-
-      return Sequences.withEffect(
-          Sequences.map(
-              base.run(queryPlus, responseContext),
-              new Function<T, T>()
-              {
-                @Override
-                public T apply(final T input)
-                {
-                  final SettableFuture<Object> future = 
SettableFuture.create();
-                  cacheFutures.add(future);
-                  backgroundExecutorService.submit(
-                      new Runnable()
-                      {
-                        @Override
-                        public void run()
-                        {
-                          try {
-                            future.set(cacheFn.apply(input));
-                          }
-                          catch (Exception e) {
-                            // if there is exception, should setException to 
quit the caching processing
-                            future.setException(e);
-                          }
-                        }
-                      }
-                  );
-                  return input;
-                }
-              }
-          ),
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              try {
-                CacheUtil.populate(cache, mapper, key, 
Futures.allAsList(cacheFutures).get());
-              }
-              catch (Exception e) {
-                log.error(e, "Error while getting future for cache task");
-                throw Throwables.propagate(e);
-              }
-            }
-          },
-          backgroundExecutorService
-      );
+      return cachePopulator.wrap(base.run(queryPlus, responseContext), value 
-> cacheFn.apply(value), cache, key);
     } else {
       return base.run(queryPlus, responseContext);
     }
diff --git 
a/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java 
b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
new file mode 100644
index 0000000..d20fcf4
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+public class BackgroundCachePopulator implements CachePopulator
+{
+  private static final Logger log = new Logger(BackgroundCachePopulator.class);
+
+  private final ListeningExecutorService exec;
+  private final ObjectMapper objectMapper;
+  private final CachePopulatorStats cachePopulatorStats;
+  private final long maxEntrySize;
+
+  public BackgroundCachePopulator(
+      final ExecutorService exec,
+      final ObjectMapper objectMapper,
+      final CachePopulatorStats cachePopulatorStats,
+      final long maxEntrySize
+  )
+  {
+    this.exec = MoreExecutors.listeningDecorator(exec);
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, 
"objectMapper");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, 
"cachePopulatorStats");
+    this.maxEntrySize = maxEntrySize;
+  }
+
+  @Override
+  public <T, CacheType> Sequence<T> wrap(
+      final Sequence<T> sequence,
+      final Function<T, CacheType> cacheFn,
+      final Cache cache,
+      final Cache.NamedKey cacheKey
+  )
+  {
+    final List<ListenableFuture<CacheType>> cacheFutures = new LinkedList<>();
+
+    final Sequence<T> wrappedSequence = Sequences.map(
+        sequence,
+        input -> {
+          cacheFutures.add(exec.submit(() -> cacheFn.apply(input)));
+          return input;
+        }
+    );
+
+    return Sequences.withEffect(
+        wrappedSequence,
+        () -> {
+          Futures.addCallback(
+              Futures.allAsList(cacheFutures),
+              new FutureCallback<List<CacheType>>()
+              {
+                @Override
+                public void onSuccess(List<CacheType> results)
+                {
+                  populateCache(cache, cacheKey, results);
+                  // Help out GC by making sure all references are gone
+                  cacheFutures.clear();
+                }
+
+                @Override
+                public void onFailure(Throwable t)
+                {
+                  log.error(t, "Background caching failed");
+                }
+              },
+              exec
+          );
+        },
+        MoreExecutors.sameThreadExecutor()
+    );
+  }
+
+  private <CacheType> void populateCache(
+      final Cache cache,
+      final Cache.NamedKey cacheKey,
+      final List<CacheType> results
+  )
+  {
+    try {
+      final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+      try (JsonGenerator gen = 
objectMapper.getFactory().createGenerator(bytes)) {
+        for (CacheType result : results) {
+          gen.writeObject(result);
+
+          if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+            cachePopulatorStats.incrementOversized();
+            return;
+          }
+        }
+      }
+
+      if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+        cachePopulatorStats.incrementOversized();
+        return;
+      }
+
+      cache.put(cacheKey, bytes.toByteArray());
+      cachePopulatorStats.incrementOk();
+    }
+    catch (Exception e) {
+      log.warn(e, "Could not populate cache");
+      cachePopulatorStats.incrementError();
+    }
+  }
+}
diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java 
b/server/src/main/java/io/druid/client/cache/CacheConfig.java
index 4b9f020..7d8e35f 100644
--- a/server/src/main/java/io/druid/client/cache/CacheConfig.java
+++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java
@@ -20,10 +20,10 @@
 package io.druid.client.cache;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
 import io.druid.query.Query;
 
 import javax.validation.constraints.Min;
-import java.util.Arrays;
 import java.util.List;
 
 public class CacheConfig
@@ -52,10 +52,13 @@ public class CacheConfig
   private int cacheBulkMergeLimit = Integer.MAX_VALUE;
 
   @JsonProperty
-  private int resultLevelCacheLimit = Integer.MAX_VALUE;
+  private int maxEntrySize = 1_000_000;
+
+  @JsonProperty
+  private List<String> unCacheable = ImmutableList.of(Query.SELECT);
 
   @JsonProperty
-  private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, 
Query.SELECT);
+  private int resultLevelCacheLimit = Integer.MAX_VALUE;
 
   public boolean isPopulateCache()
   {
@@ -87,6 +90,11 @@ public class CacheConfig
     return cacheBulkMergeLimit;
   }
 
+  public int getMaxEntrySize()
+  {
+    return maxEntrySize;
+  }
+
   public int getResultLevelCacheLimit()
   {
     return resultLevelCacheLimit;
diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java 
b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
index 515555a..576f9db 100644
--- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java
+++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java
@@ -20,27 +20,24 @@
 package io.druid.client.cache;
 
 import com.google.inject.Inject;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.emitter.service.ServiceMetricEvent;
 import io.druid.java.util.metrics.AbstractMonitor;
-import io.druid.java.util.common.StringUtils;
 
 public class CacheMonitor extends AbstractMonitor
 {
   // package private for tests
   volatile Cache cache;
 
+  private final CachePopulatorStats cachePopulatorStats;
   private volatile CacheStats prevCacheStats = null;
+  private volatile CachePopulatorStats.Snapshot prevCachePopulatorStats = null;
 
-  public CacheMonitor()
+  @Inject
+  public CacheMonitor(final CachePopulatorStats cachePopulatorStats)
   {
-  }
-
-  public CacheMonitor(
-      Cache cache
-  )
-  {
-    this.cache = cache;
+    this.cachePopulatorStats = cachePopulatorStats;
   }
 
   // make it possible to enable CacheMonitor even if cache is not bound
@@ -58,10 +55,16 @@ public class CacheMonitor extends AbstractMonitor
       final CacheStats currCacheStats = cache.getStats();
       final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
 
+      final CachePopulatorStats.Snapshot currCachePopulatorStats = 
cachePopulatorStats.snapshot();
+      final CachePopulatorStats.Snapshot deltaCachePopulatorStats = 
currCachePopulatorStats.delta(
+          prevCachePopulatorStats
+      );
+
       final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
-      emitStats(emitter, "query/cache/delta", deltaCacheStats, builder);
-      emitStats(emitter, "query/cache/total", currCacheStats, builder);
+      emitStats(emitter, "query/cache/delta", deltaCachePopulatorStats, 
deltaCacheStats, builder);
+      emitStats(emitter, "query/cache/total", currCachePopulatorStats, 
currCacheStats, builder);
 
+      prevCachePopulatorStats = currCachePopulatorStats;
       prevCacheStats = currCacheStats;
 
       // Any custom cache statistics that need monitoring
@@ -71,13 +74,15 @@ public class CacheMonitor extends AbstractMonitor
   }
 
   private void emitStats(
-      ServiceEmitter emitter,
+      final ServiceEmitter emitter,
       final String metricPrefix,
-      CacheStats cacheStats,
-      ServiceMetricEvent.Builder builder
+      final CachePopulatorStats.Snapshot cachePopulatorStats,
+      final CacheStats cacheStats,
+      final ServiceMetricEvent.Builder builder
   )
   {
     if (cache != null) {
+      // Cache stats.
       emitter.emit(builder.build(StringUtils.format("%s/numEntries", 
metricPrefix), cacheStats.getNumEntries()));
       emitter.emit(builder.build(StringUtils.format("%s/sizeBytes", 
metricPrefix), cacheStats.getSizeInBytes()));
       emitter.emit(builder.build(StringUtils.format("%s/hits", metricPrefix), 
cacheStats.getNumHits()));
@@ -87,6 +92,13 @@ public class CacheMonitor extends AbstractMonitor
       emitter.emit(builder.build(StringUtils.format("%s/averageBytes", 
metricPrefix), cacheStats.averageBytes()));
       emitter.emit(builder.build(StringUtils.format("%s/timeouts", 
metricPrefix), cacheStats.getNumTimeouts()));
       emitter.emit(builder.build(StringUtils.format("%s/errors", 
metricPrefix), cacheStats.getNumErrors()));
+
+      // Cache populator stats.
+      emitter.emit(builder.build(StringUtils.format("%s/put/ok", 
metricPrefix), cachePopulatorStats.getNumOk()));
+      emitter.emit(builder.build(StringUtils.format("%s/put/error", 
metricPrefix), cachePopulatorStats.getNumError()));
+      emitter.emit(
+          builder.build(StringUtils.format("%s/put/oversized", metricPrefix), 
cachePopulatorStats.getNumOversized())
+      );
     }
   }
 }
diff --git 
a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java 
b/server/src/main/java/io/druid/client/cache/CachePopulator.java
similarity index 66%
rename from 
processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
rename to server/src/main/java/io/druid/client/cache/CachePopulator.java
index abd7514..5c8fc88 100644
--- a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java
+++ b/server/src/main/java/io/druid/client/cache/CachePopulator.java
@@ -17,21 +17,18 @@
  * under the License.
  */
 
-package io.druid.guice.annotations;
+package io.druid.client.cache;
 
-import com.google.inject.BindingAnnotation;
+import io.druid.java.util.common.guava.Sequence;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import java.util.function.Function;
 
-/**
- *
- */
-@BindingAnnotation
-@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface BackgroundCaching
+public interface CachePopulator
 {
+  <T, CacheType> Sequence<T> wrap(
+      Sequence<T> sequence,
+      Function<T, CacheType> cacheFn,
+      Cache cache,
+      Cache.NamedKey cacheKey
+  );
 }
diff --git 
a/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java 
b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
new file mode 100644
index 0000000..f1bcd30
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ */
+public class CachePopulatorStats
+{
+  private final AtomicLong okCounter = new AtomicLong();
+  private final AtomicLong errorCounter = new AtomicLong();
+  private final AtomicLong oversizedCounter = new AtomicLong();
+
+  public void incrementOk()
+  {
+    okCounter.incrementAndGet();
+  }
+
+  public void incrementError()
+  {
+    errorCounter.incrementAndGet();
+  }
+
+  public void incrementOversized()
+  {
+    oversizedCounter.incrementAndGet();
+  }
+
+  public Snapshot snapshot()
+  {
+    return new Snapshot(
+        okCounter.get(),
+        errorCounter.get(),
+        oversizedCounter.get()
+    );
+  }
+
+  public static class Snapshot
+  {
+    private final long numOk;
+    private final long numError;
+    private final long numOversized;
+
+    Snapshot(final long numOk, final long numError, final long numOversized)
+    {
+      this.numOk = numOk;
+      this.numError = numError;
+      this.numOversized = numOversized;
+    }
+
+    public long getNumOk()
+    {
+      return numOk;
+    }
+
+    public long getNumError()
+    {
+      return numError;
+    }
+
+    public long getNumOversized()
+    {
+      return numOversized;
+    }
+
+    public Snapshot delta(Snapshot oldSnapshot)
+    {
+      if (oldSnapshot == null) {
+        return this;
+      } else {
+        return new Snapshot(
+            numOk - oldSnapshot.numOk,
+            numError - oldSnapshot.numError,
+            numOversized - oldSnapshot.numOversized
+        );
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java 
b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
new file mode 100644
index 0000000..7e7fd5e
--- /dev/null
+++ b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.guava.SequenceWrapper;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.logger.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+public class ForegroundCachePopulator implements CachePopulator
+{
+  private static final Logger log = new Logger(ForegroundCachePopulator.class);
+
+  private final Object lock = new Object();
+  private final ObjectMapper objectMapper;
+  private final CachePopulatorStats cachePopulatorStats;
+  private final long maxEntrySize;
+
+  public ForegroundCachePopulator(
+      final ObjectMapper objectMapper,
+      final CachePopulatorStats cachePopulatorStats,
+      final long maxEntrySize
+  )
+  {
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, 
"objectMapper");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, 
"cachePopulatorStats");
+    this.maxEntrySize = maxEntrySize;
+  }
+
+  @Override
+  public <T, CacheType> Sequence<T> wrap(
+      final Sequence<T> sequence,
+      final Function<T, CacheType> cacheFn,
+      final Cache cache,
+      final Cache.NamedKey cacheKey
+  )
+  {
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    final AtomicBoolean tooBig = new AtomicBoolean(false);
+    final JsonGenerator jsonGenerator;
+
+    try {
+      jsonGenerator = objectMapper.getFactory().createGenerator(bytes);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return Sequences.wrap(
+        Sequences.map(
+            sequence,
+            input -> {
+              if (!tooBig.get()) {
+                synchronized (lock) {
+                  try {
+                    jsonGenerator.writeObject(cacheFn.apply(input));
+
+                    // Not flushing jsonGenerator before checking this, but 
should be ok since Jackson buffers are
+                    // typically just a few KB, and we don't want to waste 
cycles flushing.
+                    if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+                      tooBig.set(true);
+                    }
+                  }
+                  catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                }
+              }
+
+              return input;
+            }
+        ),
+        new SequenceWrapper()
+        {
+          @Override
+          public void after(final boolean isDone, final Throwable thrown) 
throws Exception
+          {
+            synchronized (lock) {
+              jsonGenerator.close();
+
+              if (isDone) {
+                // Check tooBig, then check maxEntrySize one more time, after 
closing/flushing jsonGenerator.
+                if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > 
maxEntrySize)) {
+                  cachePopulatorStats.incrementOversized();
+                  return;
+                }
+
+                try {
+                  cache.put(cacheKey, bytes.toByteArray());
+                  cachePopulatorStats.incrementOk();
+                }
+                catch (Exception e) {
+                  log.warn(e, "Unable to write to cache");
+                  cachePopulatorStats.incrementError();
+                }
+              }
+            }
+          }
+        }
+    );
+  }
+}
diff --git a/server/src/main/java/io/druid/client/cache/HybridCache.java 
b/server/src/main/java/io/druid/client/cache/HybridCache.java
index f732bee..c25beff 100644
--- a/server/src/main/java/io/druid/client/cache/HybridCache.java
+++ b/server/src/main/java/io/druid/client/cache/HybridCache.java
@@ -21,8 +21,8 @@ package io.druid.client.cache;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
diff --git a/server/src/main/java/io/druid/guice/CacheModule.java 
b/server/src/main/java/io/druid/guice/CacheModule.java
index 19d6e57..488e52c 100644
--- a/server/src/main/java/io/druid/guice/CacheModule.java
+++ b/server/src/main/java/io/druid/guice/CacheModule.java
@@ -24,6 +24,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
 import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.CacheProvider;
 import io.druid.guice.annotations.Global;
 
@@ -48,6 +49,7 @@ public class CacheModule implements Module
   public void configure(Binder binder)
   {
     binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, 
Global.class)).in(ManageLifecycle.class);
+    binder.bind(CachePopulatorStats.class).in(LazySingleton.class);
     JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
 
     binder.install(new HybridCacheModule(prefix));
diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java 
b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
index c0c3287..ff9859d 100644
--- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java
@@ -19,22 +19,26 @@
 
 package io.druid.guice;
 
-import com.google.common.util.concurrent.MoreExecutors;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.collections.BlockingPool;
 import io.druid.collections.DefaultBlockingPool;
 import io.druid.collections.NonBlockingPool;
 import io.druid.collections.StupidPool;
 import io.druid.common.utils.VMUtils;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Global;
 import io.druid.guice.annotations.Merging;
 import io.druid.guice.annotations.Processing;
+import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import io.druid.java.util.common.lifecycle.Lifecycle;
@@ -64,14 +68,15 @@ public class DruidProcessingModule implements Module
   }
 
   @Provides
-  @BackgroundCaching
   @LazySingleton
-  public ExecutorService getBackgroundExecutorService(
+  public CachePopulator getCachePopulator(
+      @Smile ObjectMapper smileMapper,
+      CachePopulatorStats cachePopulatorStats,
       CacheConfig cacheConfig
   )
   {
     if (cacheConfig.getNumBackgroundThreads() > 0) {
-      return Executors.newFixedThreadPool(
+      final ExecutorService exec = Executors.newFixedThreadPool(
           cacheConfig.getNumBackgroundThreads(),
           new ThreadFactoryBuilder()
               .setNameFormat("background-cacher-%d")
@@ -79,8 +84,10 @@ public class DruidProcessingModule implements Module
               .setPriority(Thread.MIN_PRIORITY)
               .build()
       );
+
+      return new BackgroundCachePopulator(exec, smileMapper, 
cachePopulatorStats, cacheConfig.getMaxEntrySize());
     } else {
-      return MoreExecutors.sameThreadExecutor();
+      return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, 
cacheConfig.getMaxEntrySize());
     }
   }
 
diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java 
b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
index 17d1326..4b2bbc1 100644
--- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java
@@ -22,16 +22,14 @@ package io.druid.guice;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
-import io.druid.client.cache.CacheConfig;
 import io.druid.collections.BlockingPool;
 import io.druid.collections.DummyBlockingPool;
 import io.druid.collections.DummyNonBlockingPool;
 import io.druid.collections.NonBlockingPool;
-import io.druid.java.util.common.concurrent.Execs;
-import io.druid.guice.annotations.BackgroundCaching;
 import io.druid.guice.annotations.Global;
 import io.druid.guice.annotations.Merging;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import io.druid.java.util.common.logger.Logger;
 import io.druid.query.DruidProcessingConfig;
@@ -59,20 +57,6 @@ public class RouterProcessingModule implements Module
   }
 
   @Provides
-  @BackgroundCaching
-  @LazySingleton
-  public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig)
-  {
-    if (cacheConfig.getNumBackgroundThreads() > 0) {
-      log.error(
-          "numBackgroundThreads[%d] configured, that is ignored on Router",
-          cacheConfig.getNumBackgroundThreads()
-      );
-    }
-    return Execs.dummy();
-  }
-
-  @Provides
   @Processing
   @ManageLifecycle
   public ExecutorService getProcessingExecutorService(DruidProcessingConfig 
config)
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c0169e7..5b79626 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -40,6 +40,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
@@ -166,7 +167,8 @@ public class AppenderatorImpl implements Appenderator
       IndexIO indexIO,
       IndexMerger indexMerger,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     this.schema = Preconditions.checkNotNull(schema, "schema");
@@ -186,7 +188,8 @@ public class AppenderatorImpl implements Appenderator
         conglomerate,
         queryExecutorService,
         Preconditions.checkNotNull(cache, "cache"),
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
     maxBytesTuningConfig = 
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
     log.info("Created Appenderator for dataSource[%s].", 
schema.getDataSource());
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
index 7651c40..1e22cdd 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
@@ -52,7 +53,8 @@ public class Appenderators
       ServiceEmitter emitter,
       ExecutorService queryExecutorService,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     return new AppenderatorImpl(
@@ -68,7 +70,8 @@ public class Appenderators
         indexIO,
         indexMerger,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
   }
 
@@ -120,6 +123,7 @@ public class Appenderators
         indexIO,
         indexMerger,
         null,
+        null,
         null
     );
   }
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
index b6b188e..7d027d6 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java
@@ -21,11 +21,11 @@ package io.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMerger;
@@ -51,6 +51,7 @@ public class DefaultRealtimeAppenderatorFactory implements 
AppenderatorFactory
   private final IndexMerger indexMerger;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
 
   public DefaultRealtimeAppenderatorFactory(
       @JacksonInject ServiceEmitter emitter,
@@ -62,7 +63,8 @@ public class DefaultRealtimeAppenderatorFactory implements 
AppenderatorFactory
       @JacksonInject IndexIO indexIO,
       @JacksonInject IndexMerger indexMerger,
       @JacksonInject Cache cache,
-      @JacksonInject CacheConfig cacheConfig
+      @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats
   )
   {
     this.emitter = emitter;
@@ -75,6 +77,7 @@ public class DefaultRealtimeAppenderatorFactory implements 
AppenderatorFactory
     this.indexMerger = indexMerger;
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
   }
 
   @Override
@@ -103,7 +106,8 @@ public class DefaultRealtimeAppenderatorFactory implements 
AppenderatorFactory
         emitter,
         queryExecutorService,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
   }
 
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 6901ace..ef18be2 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -24,15 +24,17 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.CachingQueryRunner;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.guava.CloseQuietly;
 import io.druid.java.util.common.guava.FunctionalIterable;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.BySegmentQueryRunner;
 import io.druid.query.CPUTimeMetricQueryRunner;
 import io.druid.query.MetricsEmittingQueryRunner;
@@ -76,6 +78,7 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
   private final ExecutorService queryExecutorService;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
 
   public SinkQuerySegmentWalker(
       String dataSource,
@@ -85,7 +88,8 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
       QueryRunnerFactoryConglomerate conglomerate,
       ExecutorService queryExecutorService,
       Cache cache,
-      CacheConfig cacheConfig
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
   )
   {
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     this.queryExecutorService = 
Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
     this.cache = Preconditions.checkNotNull(cache, "cache");
     this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
+    this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, 
"cachePopulatorStats");
 
     if (!cache.isLocal()) {
       log.warn("Configured cache[%s] is not local, caching will not be 
enabled.", cache.getClass().getName());
@@ -235,7 +240,12 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
                                                             cache,
                                                             toolChest,
                                                             baseRunner,
-                                                            
MoreExecutors.sameThreadExecutor(),
+                                                            // Always populate 
in foreground regardless of config
+                                                            new 
ForegroundCachePopulator(
+                                                                objectMapper,
+                                                                
cachePopulatorStats,
+                                                                
cacheConfig.getMaxEntrySize()
+                                                            ),
                                                             cacheConfig
                                                         );
                                                       } else {
diff --git 
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java 
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
index aefbdf0..2bdf3a3 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.StringUtils;
@@ -73,6 +74,7 @@ public class FlushingPlumber extends RealtimePlumber
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       ObjectMapper objectMapper
 
   )
@@ -92,6 +94,7 @@ public class FlushingPlumber extends RealtimePlumber
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
 
diff --git 
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
 
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
index ea5e85b..244b2f0 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
@@ -24,10 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -57,6 +58,7 @@ public class FlushingPlumberSchool extends 
RealtimePlumberSchool
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final ObjectMapper objectMapper;
 
   @JsonCreator
@@ -70,6 +72,7 @@ public class FlushingPlumberSchool extends 
RealtimePlumberSchool
       @JacksonInject IndexIO indexIO,
       @JacksonInject Cache cache,
       @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats,
       @JacksonInject ObjectMapper objectMapper
   )
   {
@@ -85,6 +88,7 @@ public class FlushingPlumberSchool extends 
RealtimePlumberSchool
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
 
@@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends 
RealtimePlumberSchool
     this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.objectMapper = objectMapper;
   }
 
@@ -122,6 +127,7 @@ public class FlushingPlumberSchool extends 
RealtimePlumberSchool
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
   }
diff --git 
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java 
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
index 1e7a5c8..bfef2c6 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.common.guava.ThreadRenamingCallable;
 import io.druid.common.guava.ThreadRenamingRunnable;
 import io.druid.common.utils.VMUtils;
@@ -146,6 +147,7 @@ public class RealtimePlumber implements Plumber
       IndexIO indexIO,
       Cache cache,
       CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats,
       ObjectMapper objectMapper
   )
   {
@@ -168,7 +170,8 @@ public class RealtimePlumber implements Plumber
         conglomerate,
         queryExecutorService,
         cache,
-        cacheConfig
+        cacheConfig,
+        cachePopulatorStats
     );
 
     log.info("Creating plumber using rejectionPolicy[%s]", 
getRejectionPolicy());
diff --git 
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
 
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index ec3ed40..6ad7aca 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.guice.annotations.Processing;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.query.QueryRunnerFactoryConglomerate;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
@@ -54,6 +55,7 @@ public class RealtimePlumberSchool implements PlumberSchool
   private final IndexIO indexIO;
   private final Cache cache;
   private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
   private final ObjectMapper objectMapper;
 
   @JsonCreator
@@ -69,6 +71,7 @@ public class RealtimePlumberSchool implements PlumberSchool
       @JacksonInject IndexIO indexIO,
       @JacksonInject Cache cache,
       @JacksonInject CacheConfig cacheConfig,
+      @JacksonInject CachePopulatorStats cachePopulatorStats,
       @JacksonInject ObjectMapper objectMapper
   )
   {
@@ -84,6 +87,7 @@ public class RealtimePlumberSchool implements PlumberSchool
 
     this.cache = cache;
     this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
     this.objectMapper = objectMapper;
   }
 
@@ -111,6 +115,7 @@ public class RealtimePlumberSchool implements PlumberSchool
         indexIO,
         cache,
         cacheConfig,
+        cachePopulatorStats,
         objectMapper
     );
   }
diff --git 
a/server/src/main/java/io/druid/server/coordination/ServerManager.java 
b/server/src/main/java/io/druid/server/coordination/ServerManager.java
index aac2e86..4a134d7 100644
--- a/server/src/main/java/io/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java
@@ -26,7 +26,7 @@ import com.google.inject.Inject;
 import io.druid.client.CachingQueryRunner;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
-import io.druid.guice.annotations.BackgroundCaching;
+import io.druid.client.cache.CachePopulator;
 import io.druid.guice.annotations.Processing;
 import io.druid.guice.annotations.Smile;
 import io.druid.java.util.common.ISE;
@@ -77,7 +77,7 @@ public class ServerManager implements QuerySegmentWalker
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final ServiceEmitter emitter;
   private final ExecutorService exec;
-  private final ExecutorService cachingExec;
+  private final CachePopulator cachePopulator;
   private final Cache cache;
   private final ObjectMapper objectMapper;
   private final CacheConfig cacheConfig;
@@ -89,7 +89,7 @@ public class ServerManager implements QuerySegmentWalker
       QueryRunnerFactoryConglomerate conglomerate,
       ServiceEmitter emitter,
       @Processing ExecutorService exec,
-      @BackgroundCaching ExecutorService cachingExec,
+      CachePopulator cachePopulator,
       @Smile ObjectMapper objectMapper,
       Cache cache,
       CacheConfig cacheConfig,
@@ -101,7 +101,7 @@ public class ServerManager implements QuerySegmentWalker
     this.emitter = emitter;
 
     this.exec = exec;
-    this.cachingExec = cachingExec;
+    this.cachePopulator = cachePopulator;
     this.cache = cache;
     this.objectMapper = objectMapper;
 
@@ -298,7 +298,7 @@ public class ServerManager implements QuerySegmentWalker
         cache,
         toolChest,
         metricsEmittingQueryRunnerInner,
-        cachingExec,
+        cachePopulator,
         cacheConfig
     );
 
diff --git 
a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
 
b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
index f50eea7..67ac01c 100644
--- 
a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
+++ 
b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -21,10 +21,11 @@ package io.druid.client;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.Cache;
+import io.druid.client.cache.CachePopulator;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.client.selector.QueryableDruidServer;
 import io.druid.client.selector.ServerSelector;
@@ -74,7 +75,9 @@ public class CachingClusteredClientFunctionalityTest
     timeline = new VersionedIntervalTimeline<>(Ordering.natural());
     serverView = EasyMock.createNiceMock(TimelineServerView.class);
     cache = MapCache.create(100000);
-    client = makeClient(MoreExecutors.sameThreadExecutor());
+    client = makeClient(
+        new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper, 
new CachePopulatorStats(), -1)
+    );
   }
 
   @Test
@@ -199,13 +202,13 @@ public class CachingClusteredClientFunctionalityTest
     ));
   }
 
-  protected CachingClusteredClient makeClient(final ListeningExecutorService 
backgroundExecutorService)
+  protected CachingClusteredClient makeClient(final CachePopulator 
cachePopulator)
   {
-    return makeClient(backgroundExecutorService, cache, 10);
+    return makeClient(cachePopulator, cache, 10);
   }
 
   protected CachingClusteredClient makeClient(
-      final ListeningExecutorService backgroundExecutorService,
+      final CachePopulator cachePopulator,
       final Cache cache,
       final int mergeLimit
   )
@@ -245,7 +248,7 @@ public class CachingClusteredClientFunctionalityTest
         },
         cache,
         CachingClusteredClientTest.jsonMapper,
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
diff --git 
a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 31f68ff..6143dad 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -43,8 +43,12 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import io.druid.client.selector.QueryableDruidServer;
@@ -330,7 +334,7 @@ public class CachingClusteredClientTest
     timeline = new VersionedIntervalTimeline<>(Ordering.natural());
     serverView = EasyMock.createNiceMock(TimelineServerView.class);
     cache = MapCache.create(100000);
-    client = makeClient(MoreExecutors.sameThreadExecutor());
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new 
CachePopulatorStats(), -1));
 
     servers = new DruidServer[]{
         new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, 
"bye", 0),
@@ -422,7 +426,14 @@ public class CachingClusteredClientTest
       }
     };
 
-    client = makeClient(randomizingExecutorService);
+    client = makeClient(
+        new BackgroundCachePopulator(
+            randomizingExecutorService,
+            jsonMapper,
+            new CachePopulatorStats(),
+            -1
+        )
+    );
 
     // callback to be run every time a query run is complete, to ensure all 
background
     // caching tasks are executed, and cache is populated before we move onto 
the next query
@@ -579,7 +590,7 @@ public class CachingClusteredClientTest
             .andReturn(ImmutableMap.of())
             .once();
     EasyMock.replay(cache);
-    client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new 
CachePopulatorStats(), -1), cache, limit);
     final DruidServer lastServer = servers[random.nextInt(servers.length)];
     final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
     
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
@@ -604,7 +615,7 @@ public class CachingClusteredClientTest
             .andReturn(ImmutableMap.of())
             .once();
     EasyMock.replay(cache);
-    client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
+    client = makeClient(new ForegroundCachePopulator(jsonMapper, new 
CachePopulatorStats(), -1), cache, 0);
     getDefaultQueryRunner().run(QueryPlus.wrap(query), context);
     EasyMock.verify(cache);
     EasyMock.verify(dataSegment);
@@ -2630,13 +2641,13 @@ public class CachingClusteredClientTest
     EasyMock.reset(mocks);
   }
 
-  protected CachingClusteredClient makeClient(final ListeningExecutorService 
backgroundExecutorService)
+  protected CachingClusteredClient makeClient(final CachePopulator 
cachePopulator)
   {
-    return makeClient(backgroundExecutorService, cache, 10);
+    return makeClient(cachePopulator, cache, 10);
   }
 
   protected CachingClusteredClient makeClient(
-      final ListeningExecutorService backgroundExecutorService,
+      final CachePopulator cachePopulator,
       final Cache cache,
       final int mergeLimit
   )
@@ -2676,7 +2687,7 @@ public class CachingClusteredClientTest
         },
         cache,
         jsonMapper,
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java 
b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
index ab2bf22..012fe2b 100644
--- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
@@ -19,20 +19,26 @@
 
 package io.druid.client;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.BackgroundCachePopulator;
 import io.druid.client.cache.Cache;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulator;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.CacheStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.MapCache;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.guava.SequenceWrapper;
@@ -63,15 +69,15 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -98,14 +104,22 @@ public class CachingQueryRunnerTest
       DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
   };
 
-  private ExecutorService backgroundExecutorService;
+  private ObjectMapper objectMapper;
+  private CachePopulator cachePopulator;
 
   public CachingQueryRunnerTest(int numBackgroundThreads)
   {
+    objectMapper = new DefaultObjectMapper();
+
     if (numBackgroundThreads > 0) {
-      backgroundExecutorService = 
Executors.newFixedThreadPool(numBackgroundThreads);
+      cachePopulator = new BackgroundCachePopulator(
+          Execs.multiThreaded(numBackgroundThreads, 
"CachingQueryRunnerTest-%d"),
+          objectMapper,
+          new CachePopulatorStats(),
+          -1
+      );
     } else {
-      backgroundExecutorService = MoreExecutors.sameThreadExecutor();
+      cachePopulator = new ForegroundCachePopulator(objectMapper, new 
CachePopulatorStats(), -1);
     }
   }
 
@@ -274,7 +288,7 @@ public class CachingQueryRunnerTest
             return resultSeq;
           }
         },
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
@@ -331,7 +345,7 @@ public class CachingQueryRunnerTest
       List<Result> expectedResults,
       Query query,
       QueryToolChest toolchest
-  )
+  ) throws IOException
   {
     DefaultObjectMapper objectMapper = new DefaultObjectMapper();
     String segmentIdentifier = "segment";
@@ -345,12 +359,7 @@ public class CachingQueryRunnerTest
     );
 
     Cache cache = MapCache.create(1024 * 1024);
-    CacheUtil.populate(
-        cache,
-        objectMapper,
-        cacheKey,
-        Iterables.transform(expectedResults, 
cacheStrategy.prepareForSegmentLevelCache())
-    );
+    cache.put(cacheKey, toByteArray(Iterables.transform(expectedResults, 
cacheStrategy.prepareForSegmentLevelCache())));
 
     CachingQueryRunner runner = new CachingQueryRunner(
         segmentIdentifier,
@@ -367,7 +376,7 @@ public class CachingQueryRunnerTest
             return Sequences.empty();
           }
         },
-        backgroundExecutorService,
+        cachePopulator,
         new CacheConfig()
         {
           @Override
@@ -434,6 +443,19 @@ public class CachingQueryRunnerTest
     return retVal;
   }
 
+  private <T> byte[] toByteArray(final Iterable<T> results) throws IOException
+  {
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+    try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) 
{
+      for (T result : results) {
+        gen.writeObject(result);
+      }
+    }
+
+    return bytes.toByteArray();
+  }
+
   private static class AssertingClosable implements Closeable
   {
 
diff --git a/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java 
b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
new file mode 100644
index 0000000..b855e20
--- /dev/null
+++ b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.client.cache;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.guava.Sequences;
+import io.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class CachePopulatorTest
+{
+  private final ExecutorService exec = Execs.multiThreaded(2, 
"cache-populator-test-%d");
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Cache cache = new MapCache(new 
ByteCountingLRUMap(Long.MAX_VALUE));
+  private final CachePopulatorStats stats = new CachePopulatorStats();
+
+  @After
+  public void tearDown()
+  {
+    exec.shutdownNow();
+  }
+
+  @Test
+  public void testForegroundPopulator()
+  {
+    final CachePopulator populator = new 
ForegroundCachePopulator(objectMapper, stats, -1);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), 
strings));
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(0, stats.snapshot().getNumOversized());
+  }
+
+  @Test
+  public void testForegroundPopulatorMaxEntrySize()
+  {
+    final CachePopulator populator = new 
ForegroundCachePopulator(objectMapper, stats, 30);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+    final List<String> strings2 = ImmutableList.of("foo", 
"baralararararararaarararararaa");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), 
strings));
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), 
strings2));
+    Assert.assertNull(readFromCache(makeKey(2)));
+
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(1, stats.snapshot().getNumOversized());
+  }
+
+  @Test(timeout = 60000L)
+  public void testBackgroundPopulator() throws InterruptedException
+  {
+    final CachePopulator populator = new BackgroundCachePopulator(exec, 
objectMapper, stats, -1);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), 
strings));
+
+    // Wait for background updates to happen.
+    while (cache.getStats().getNumEntries() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(0, stats.snapshot().getNumOversized());
+  }
+
+  @Test(timeout = 60000L)
+  public void testBackgroundPopulatorMaxEntrySize() throws InterruptedException
+  {
+    final CachePopulator populator = new BackgroundCachePopulator(exec, 
objectMapper, stats, 30);
+    final List<String> strings = ImmutableList.of("foo", "bar");
+    final List<String> strings2 = ImmutableList.of("foo", 
"baralararararararaarararararaa");
+
+    Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), 
strings));
+    Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), 
strings2));
+
+    // Wait for background updates to happen.
+    while (cache.getStats().getNumEntries() < 1 || 
stats.snapshot().getNumOversized() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(strings, readFromCache(makeKey(1)));
+    Assert.assertNull(readFromCache(makeKey(2)));
+    Assert.assertEquals(1, stats.snapshot().getNumOk());
+    Assert.assertEquals(0, stats.snapshot().getNumError());
+    Assert.assertEquals(1, stats.snapshot().getNumOversized());
+  }
+
+  private static Cache.NamedKey makeKey(final int n)
+  {
+    return new Cache.NamedKey("test", Ints.toByteArray(n));
+  }
+
+  private List<String> wrapAndReturn(
+      final CachePopulator populator,
+      final Cache.NamedKey key,
+      final List<String> strings
+  )
+  {
+    return populator.wrap(Sequences.simple(strings), s -> ImmutableMap.of("s", 
s), cache, key).toList();
+  }
+
+  private List<String> readFromCache(final Cache.NamedKey key)
+  {
+    final byte[] bytes = cache.get(key);
+    if (bytes == null) {
+      return null;
+    }
+
+    try (
+        final MappingIterator<Map<String, String>> iterator = 
objectMapper.readValues(
+            objectMapper.getFactory().createParser(bytes),
+            JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
+        )
+    ) {
+      final List<Map<String, String>> retVal = new ArrayList<>();
+      Iterators.addAll(retVal, iterator);
+
+      // Undo map-wrapping that was done in wrapAndReturn.
+      return retVal.stream().map(m -> m.get("s")).collect(Collectors.toList());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java 
b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
index 7b503aa..f88f5bf 100644
--- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.JSONParseSpec;
@@ -115,6 +116,7 @@ public class FireDepartmentTest
                 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
                 MapCache.create(0),
                 NO_CACHE_CONFIG,
+                new CachePopulatorStats(),
                 TestHelper.makeJsonMapper()
 
             ),
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
index 3c675c6..c5522d9 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.JSONParseSpec;
@@ -273,7 +274,8 @@ public class AppenderatorTester implements AutoCloseable
         emitter,
         queryExecutor,
         MapCache.create(2048),
-        new CacheConfig()
+        new CacheConfig(),
+        new CachePopulatorStats()
     );
   }
 
diff --git 
a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
 
b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index a5ad8a4..ef96417 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.client.cache.CachePopulatorStats;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
@@ -223,6 +224,7 @@ public class RealtimePlumberSchoolTest
         TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
         MapCache.create(0),
         FireDepartmentTest.NO_CACHE_CONFIG,
+        new CachePopulatorStats(),
         TestHelper.makeJsonMapper()
     );
 
diff --git 
a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java 
b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
index 1287748..df4aebf 100644
--- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
+++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java
@@ -26,8 +26,9 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.CachePopulatorStats;
+import io.druid.client.cache.ForegroundCachePopulator;
 import io.druid.client.cache.LocalCacheProvider;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.IAE;
@@ -153,7 +154,7 @@ public class ServerManagerTest
         },
         new NoopServiceEmitter(),
         serverManagerExec,
-        MoreExecutors.sameThreadExecutor(),
+        new ForegroundCachePopulator(new DefaultObjectMapper(), new 
CachePopulatorStats(), -1),
         new DefaultObjectMapper(),
         new LocalCacheProvider().get(),
         new CacheConfig(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to