This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8351495  remove unnecessary lock in ForegroundCachePopulator leading 
to a lot of contention (#8116)
8351495 is described below

commit 83514958dbc873c56c91ef3d8e8668ac609a7e1c
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jul 23 10:57:59 2019 -0700

    remove unnecessary lock in ForegroundCachePopulator leading to a lot of 
contention (#8116)
    
    * remove unecessary lock in ForegroundCachePopulator leading to a lot of 
contention
    
    * mutableboolean, javadocs,document some cache configs that were missing
    
    * more doc stuff
    
    * adjustments
    
    * remove background documentation
---
 docs/content/configuration/index.md                |  3 ++
 .../org/apache/druid/query/QueryRunnerFactory.java | 30 ++++++-----
 .../client/cache/BackgroundCachePopulator.java     |  4 ++
 .../apache/druid/client/cache/CachePopulator.java  | 19 +++++++
 .../druid/client/cache/CachePopulatorStats.java    |  5 ++
 .../client/cache/ForegroundCachePopulator.java     | 61 +++++++++++-----------
 6 files changed, 77 insertions(+), 45 deletions(-)

diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index 28d4346..073b512 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1176,6 +1176,7 @@ You can optionally configure caching to be enabled on the 
peons by setting cachi
 |`druid.realtime.cache.useCache`|true, false|Enable the cache on the 
realtime.|false|
 |`druid.realtime.cache.populateCache`|true, false|Populate the cache on the 
realtime.|false|
 |`druid.realtime.cache.unCacheable`|All druid query types|All query types to 
not cache.|`["groupBy", "select"]`|
+|`druid.realtime.cache.maxEntrySize`|Maximum cache entry size in 
bytes.|1_000_000|
 
 See [cache configuration](#cache-configuration) for how to configure cache 
settings.
 
@@ -1319,6 +1320,7 @@ You can optionally only configure caching to be enabled 
on the Historical by set
 |`druid.historical.cache.useCache`|true, false|Enable the cache on the 
Historical.|false|
 |`druid.historical.cache.populateCache`|true, false|Populate the cache on the 
Historical.|false|
 |`druid.historical.cache.unCacheable`|All druid query types|All query types to 
not cache.|["groupBy", "select"]|
+|`druid.historical.cache.maxEntrySize`|Maximum cache entry size in 
bytes.|1_000_000|
 
 See [cache configuration](#cache-configuration) for how to configure cache 
settings.
 
@@ -1452,6 +1454,7 @@ You can optionally only configure caching to be enabled 
on the Broker by setting
 |`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of 
query response that can be cached.|`Integer.MAX_VALUE`|
 |`druid.broker.cache.unCacheable`|All druid query types|All query types to not 
cache.|`["groupBy", "select"]`|
 |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with 
more segments than this number will not attempt to fetch from cache at the 
broker level, leaving potential caching fetches (and cache result merging) to 
the Historicals|`Integer.MAX_VALUE`|
+|`druid.broker.cache.maxEntrySize`|Maximum cache entry size in 
bytes.|1_000_000|
 
 See [cache configuration](#cache-configuration) for how to configure cache 
settings.
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java 
b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
index fc4e5a6..0832fb1 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
@@ -25,42 +25,44 @@ import org.apache.druid.segment.Segment;
 import java.util.concurrent.ExecutorService;
 
 /**
- * An interface that defines the nitty gritty implementation detauls of a 
Query on a Segment
+ * An interface that defines the nitty gritty implementation details of a 
Query on a Segment
  */
 @ExtensionPoint
 public interface QueryRunnerFactory<T, QueryType extends Query<T>>
 {
   /**
-   * Given a specific segment, this method will create a QueryRunner.
+   * Given a specific segment, this method will create a {@link QueryRunner}.
    *
-   * The QueryRunner, when asked, will generate a Sequence of results based on 
the given segment.  This
-   * is the meat of the query processing and is where the results are actually 
generated.  Everything else
-   * is just merging and reduction logic.
+   * The {@link QueryRunner}, when asked, will generate a {@link 
org.apache.druid.java.util.common.guava.Sequence} of
+   * results based on the given segment.  This is the meat of the {@link 
Query} processing and is where the results are
+   * actually generated.  Everything else is just merging and reduction logic.
    *
-   * @param segment The segment to process
-   * @return A QueryRunner that, when asked, will generate a Sequence of 
results based on the given segment
+   * @param   segment The segment to process
+   * @return  A {@link QueryRunner} that, when asked, will generate a
+   *          {@link org.apache.druid.java.util.common.guava.Sequence} of 
results based on the given segment
    */
   QueryRunner<T> createRunner(Segment segment);
 
   /**
    * Runners generated with createRunner() and combined into an Iterable in 
(time,shardId) order are passed
-   * along to this method with an ExecutorService.  The method should then 
return a QueryRunner that, when
-   * asked, will use the ExecutorService to run the base QueryRunners in some 
fashion.
+   * along to this method with an {@link ExecutorService}.  The method should 
then return a {@link QueryRunner} that,
+   * when asked, will use the {@link ExecutorService} to run the base 
QueryRunners in some fashion.
    *
-   * The vast majority of the time, this should be implemented with
+   * The vast majority of the time, this should be implemented with {@link 
ChainedExecutionQueryRunner}:
    *
    *     return new ChainedExecutionQueryRunner<>(queryExecutor, 
toolChest.getOrdering(), queryWatcher, queryRunners);
    *
    * Which will allow for parallel execution up to the maximum number of 
processing threads allowed.
    *
-   * @param queryExecutor ExecutorService to be used for parallel processing
-   * @param queryRunners Individual QueryRunner objects that produce some 
results
-   * @return a QueryRunner that, when asked, will use the ExecutorService to 
run the base QueryRunners
+   * @param queryExecutor   {@link ExecutorService} to be used for parallel 
processing
+   * @param queryRunners    Individual {@link QueryRunner} objects that 
produce some results
+   * @return                a {@link QueryRunner} that, when asked, will use 
the {@link ExecutorService} to run the base
+   *                        {@link QueryRunner} collection.
    */
   QueryRunner<T> mergeRunners(ExecutorService queryExecutor, 
Iterable<QueryRunner<T>> queryRunners);
 
   /**
-   * Provides access to the toolchest for this specific query type.
+   * Provides access to the {@link QueryToolChest} for this specific {@link 
Query} type.
    *
    * @return an instance of the toolchest for this specific query type.
    */
diff --git 
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
 
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
index 0e78e99..3b7f30d 100644
--- 
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
+++ 
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
@@ -38,6 +38,10 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
+/**
+ * {@link CachePopulator} implementation that uses a {@link ExecutorService} 
thread pool to populate a cache in the
+ * background. Used if config "druid.*.cache.numBackgroundThreads" is greater 
than 0.
+ */
 public class BackgroundCachePopulator implements CachePopulator
 {
   private static final Logger log = new Logger(BackgroundCachePopulator.class);
diff --git 
a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java 
b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
index 0d67cb9..dbbade1 100644
--- a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
+++ b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
@@ -23,6 +23,25 @@ import org.apache.druid.java.util.common.guava.Sequence;
 
 import java.util.function.Function;
 
+/**
+ * Abstraction of mechanism for populating a {@link Cache} by wrapping a 
{@link Sequence} and providing a function to
+ * extract the values from it. At runtime, the {@link CachePopulator} 
implementation is used as a singleton and
+ * injected where needed to share between all cacheables, which requires the 
{@link Cache} itself to be thread-safe.
+ *
+ * Consumers of the {@link Sequence} will either be a processing thread (in 
the case of a historical or task), or
+ * an http thread in the case of a broker. See:
+ *
+ *  historicals:    {@link org.apache.druid.server.coordination.ServerManager} 
and
+ *                  {@link org.apache.druid.client.CachingQueryRunner}
+ *
+ *  realtime tasks: {@link 
org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} and
+ *                  {@link org.apache.druid.client.CachingQueryRunner}
+ *
+ *  brokers:        {@link org.apache.druid.server.ClientQuerySegmentWalker} 
and
+ *                  {@link org.apache.druid.client.CachingClusteredClient}
+ *
+ *  for additional details
+ */
 public interface CachePopulator
 {
   <T, CacheType> Sequence<T> wrap(
diff --git 
a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java 
b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
index cfe4587..e3e768b 100644
--- 
a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
+++ 
b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
@@ -22,6 +22,11 @@ package org.apache.druid.client.cache;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
+ * Thread safe collector of {@link CachePopulator} statistics, utilized {@link 
CacheMonitor} to emit cache metrics.
+ * Like the {@link CachePopulator}, this is used as a singleton.
+ *
+ * See {@link org.apache.druid.guice.DruidProcessingModule#getCachePopulator} 
which supplies either
+ * {@link ForegroundCachePopulator} or {@link BackgroundCachePopulator}, as 
configured, for more details.
  */
 public class CachePopulatorStats
 {
diff --git 
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
 
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
index 5ace591..0bdeb3a 100644
--- 
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
+++ 
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
@@ -22,6 +22,7 @@ package org.apache.druid.client.cache;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.SequenceWrapper;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -29,14 +30,16 @@ import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+/**
+ * {@link CachePopulator} implementation that populates a cache on the same 
thread that is processing the
+ * {@link Sequence}. Used if config "druid.*.cache.numBackgroundThreads" is 0 
(the default).
+ */
 public class ForegroundCachePopulator implements CachePopulator
 {
   private static final Logger log = new Logger(ForegroundCachePopulator.class);
 
-  private final Object lock = new Object();
   private final ObjectMapper objectMapper;
   private final CachePopulatorStats cachePopulatorStats;
   private final long maxEntrySize;
@@ -61,7 +64,7 @@ public class ForegroundCachePopulator implements 
CachePopulator
   )
   {
     final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    final AtomicBoolean tooBig = new AtomicBoolean(false);
+    final MutableBoolean tooBig = new MutableBoolean(false);
     final JsonGenerator jsonGenerator;
 
     try {
@@ -75,21 +78,19 @@ public class ForegroundCachePopulator implements 
CachePopulator
         Sequences.map(
             sequence,
             input -> {
-              if (!tooBig.get()) {
-                synchronized (lock) {
-                  try {
-                    jsonGenerator.writeObject(cacheFn.apply(input));
+              if (!tooBig.isTrue()) {
+                try {
+                  jsonGenerator.writeObject(cacheFn.apply(input));
 
-                    // Not flushing jsonGenerator before checking this, but 
should be ok since Jackson buffers are
-                    // typically just a few KB, and we don't want to waste 
cycles flushing.
-                    if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
-                      tooBig.set(true);
-                    }
-                  }
-                  catch (IOException e) {
-                    throw new RuntimeException(e);
+                  // Not flushing jsonGenerator before checking this, but 
should be ok since Jackson buffers are
+                  // typically just a few KB, and we don't want to waste 
cycles flushing.
+                  if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+                    tooBig.setValue(true);
                   }
                 }
+                catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
               }
 
               return input;
@@ -100,24 +101,22 @@ public class ForegroundCachePopulator implements 
CachePopulator
           @Override
           public void after(final boolean isDone, final Throwable thrown) 
throws Exception
           {
-            synchronized (lock) {
-              jsonGenerator.close();
+            jsonGenerator.close();
 
-              if (isDone) {
-                // Check tooBig, then check maxEntrySize one more time, after 
closing/flushing jsonGenerator.
-                if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > 
maxEntrySize)) {
-                  cachePopulatorStats.incrementOversized();
-                  return;
-                }
+            if (isDone) {
+              // Check tooBig, then check maxEntrySize one more time, after 
closing/flushing jsonGenerator.
+              if (tooBig.isTrue() || (maxEntrySize > 0 && bytes.size() > 
maxEntrySize)) {
+                cachePopulatorStats.incrementOversized();
+                return;
+              }
 
-                try {
-                  cache.put(cacheKey, bytes.toByteArray());
-                  cachePopulatorStats.incrementOk();
-                }
-                catch (Exception e) {
-                  log.warn(e, "Unable to write to cache");
-                  cachePopulatorStats.incrementError();
-                }
+              try {
+                cache.put(cacheKey, bytes.toByteArray());
+                cachePopulatorStats.incrementOk();
+              }
+              catch (Exception e) {
+                log.warn(e, "Unable to write to cache");
+                cachePopulatorStats.incrementError();
               }
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to