This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8351495 remove unnecessary lock in ForegroundCachePopulator leading
to a lot of contention (#8116)
8351495 is described below
commit 83514958dbc873c56c91ef3d8e8668ac609a7e1c
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jul 23 10:57:59 2019 -0700
remove unnecessary lock in ForegroundCachePopulator leading to a lot of
contention (#8116)
* remove unecessary lock in ForegroundCachePopulator leading to a lot of
contention
* mutableboolean, javadocs,document some cache configs that were missing
* more doc stuff
* adjustments
* remove background documentation
---
docs/content/configuration/index.md | 3 ++
.../org/apache/druid/query/QueryRunnerFactory.java | 30 ++++++-----
.../client/cache/BackgroundCachePopulator.java | 4 ++
.../apache/druid/client/cache/CachePopulator.java | 19 +++++++
.../druid/client/cache/CachePopulatorStats.java | 5 ++
.../client/cache/ForegroundCachePopulator.java | 61 +++++++++++-----------
6 files changed, 77 insertions(+), 45 deletions(-)
diff --git a/docs/content/configuration/index.md
b/docs/content/configuration/index.md
index 28d4346..073b512 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1176,6 +1176,7 @@ You can optionally configure caching to be enabled on the
peons by setting cachi
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the
realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the
realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to
not cache.|`["groupBy", "select"]`|
+|`druid.realtime.cache.maxEntrySize`|Maximum cache entry size in
bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache
settings.
@@ -1319,6 +1320,7 @@ You can optionally only configure caching to be enabled
on the Historical by set
|`druid.historical.cache.useCache`|true, false|Enable the cache on the
Historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the
Historical.|false|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to
not cache.|["groupBy", "select"]|
+|`druid.historical.cache.maxEntrySize`|Maximum cache entry size in
bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache
settings.
@@ -1452,6 +1454,7 @@ You can optionally only configure caching to be enabled
on the Broker by setting
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of
query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not
cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with
more segments than this number will not attempt to fetch from cache at the
broker level, leaving potential caching fetches (and cache result merging) to
the Historicals|`Integer.MAX_VALUE`|
+|`druid.broker.cache.maxEntrySize`|Maximum cache entry size in
bytes.|1_000_000|
See [cache configuration](#cache-configuration) for how to configure cache
settings.
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
index fc4e5a6..0832fb1 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java
@@ -25,42 +25,44 @@ import org.apache.druid.segment.Segment;
import java.util.concurrent.ExecutorService;
/**
- * An interface that defines the nitty gritty implementation detauls of a
Query on a Segment
+ * An interface that defines the nitty gritty implementation details of a
Query on a Segment
*/
@ExtensionPoint
public interface QueryRunnerFactory<T, QueryType extends Query<T>>
{
/**
- * Given a specific segment, this method will create a QueryRunner.
+ * Given a specific segment, this method will create a {@link QueryRunner}.
*
- * The QueryRunner, when asked, will generate a Sequence of results based on
the given segment. This
- * is the meat of the query processing and is where the results are actually
generated. Everything else
- * is just merging and reduction logic.
+ * The {@link QueryRunner}, when asked, will generate a {@link
org.apache.druid.java.util.common.guava.Sequence} of
+ * results based on the given segment. This is the meat of the {@link
Query} processing and is where the results are
+ * actually generated. Everything else is just merging and reduction logic.
*
- * @param segment The segment to process
- * @return A QueryRunner that, when asked, will generate a Sequence of
results based on the given segment
+ * @param segment The segment to process
+ * @return A {@link QueryRunner} that, when asked, will generate a
+ * {@link org.apache.druid.java.util.common.guava.Sequence} of
results based on the given segment
*/
QueryRunner<T> createRunner(Segment segment);
/**
* Runners generated with createRunner() and combined into an Iterable in
(time,shardId) order are passed
- * along to this method with an ExecutorService. The method should then
return a QueryRunner that, when
- * asked, will use the ExecutorService to run the base QueryRunners in some
fashion.
+ * along to this method with an {@link ExecutorService}. The method should
then return a {@link QueryRunner} that,
+ * when asked, will use the {@link ExecutorService} to run the base
QueryRunners in some fashion.
*
- * The vast majority of the time, this should be implemented with
+ * The vast majority of the time, this should be implemented with {@link
ChainedExecutionQueryRunner}:
*
* return new ChainedExecutionQueryRunner<>(queryExecutor,
toolChest.getOrdering(), queryWatcher, queryRunners);
*
* Which will allow for parallel execution up to the maximum number of
processing threads allowed.
*
- * @param queryExecutor ExecutorService to be used for parallel processing
- * @param queryRunners Individual QueryRunner objects that produce some
results
- * @return a QueryRunner that, when asked, will use the ExecutorService to
run the base QueryRunners
+ * @param queryExecutor {@link ExecutorService} to be used for parallel
processing
+ * @param queryRunners Individual {@link QueryRunner} objects that
produce some results
+ * @return a {@link QueryRunner} that, when asked, will use
the {@link ExecutorService} to run the base
+ * {@link QueryRunner} collection.
*/
QueryRunner<T> mergeRunners(ExecutorService queryExecutor,
Iterable<QueryRunner<T>> queryRunners);
/**
- * Provides access to the toolchest for this specific query type.
+ * Provides access to the {@link QueryToolChest} for this specific {@link
Query} type.
*
* @return an instance of the toolchest for this specific query type.
*/
diff --git
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
index 0e78e99..3b7f30d 100644
---
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
+++
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
@@ -38,6 +38,10 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+/**
+ * {@link CachePopulator} implementation that uses a {@link ExecutorService}
thread pool to populate a cache in the
+ * background. Used if config "druid.*.cache.numBackgroundThreads" is greater
than 0.
+ */
public class BackgroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(BackgroundCachePopulator.class);
diff --git
a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
index 0d67cb9..dbbade1 100644
--- a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
+++ b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java
@@ -23,6 +23,25 @@ import org.apache.druid.java.util.common.guava.Sequence;
import java.util.function.Function;
+/**
+ * Abstraction of mechanism for populating a {@link Cache} by wrapping a
{@link Sequence} and providing a function to
+ * extract the values from it. At runtime, the {@link CachePopulator}
implementation is used as a singleton and
+ * injected where needed to share between all cacheables, which requires the
{@link Cache} itself to be thread-safe.
+ *
+ * Consumers of the {@link Sequence} will either be a processing thread (in
the case of a historical or task), or
+ * an http thread in the case of a broker. See:
+ *
+ * historicals: {@link org.apache.druid.server.coordination.ServerManager}
and
+ * {@link org.apache.druid.client.CachingQueryRunner}
+ *
+ * realtime tasks: {@link
org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} and
+ * {@link org.apache.druid.client.CachingQueryRunner}
+ *
+ * brokers: {@link org.apache.druid.server.ClientQuerySegmentWalker}
and
+ * {@link org.apache.druid.client.CachingClusteredClient}
+ *
+ * for additional details
+ */
public interface CachePopulator
{
<T, CacheType> Sequence<T> wrap(
diff --git
a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
index cfe4587..e3e768b 100644
---
a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
+++
b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java
@@ -22,6 +22,11 @@ package org.apache.druid.client.cache;
import java.util.concurrent.atomic.AtomicLong;
/**
+ * Thread safe collector of {@link CachePopulator} statistics, utilized {@link
CacheMonitor} to emit cache metrics.
+ * Like the {@link CachePopulator}, this is used as a singleton.
+ *
+ * See {@link org.apache.druid.guice.DruidProcessingModule#getCachePopulator}
which supplies either
+ * {@link ForegroundCachePopulator} or {@link BackgroundCachePopulator}, as
configured, for more details.
*/
public class CachePopulatorStats
{
diff --git
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
index 5ace591..0bdeb3a 100644
---
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
+++
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
@@ -22,6 +22,7 @@ package org.apache.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -29,14 +30,16 @@ import org.apache.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+/**
+ * {@link CachePopulator} implementation that populates a cache on the same
thread that is processing the
+ * {@link Sequence}. Used if config "druid.*.cache.numBackgroundThreads" is 0
(the default).
+ */
public class ForegroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(ForegroundCachePopulator.class);
- private final Object lock = new Object();
private final ObjectMapper objectMapper;
private final CachePopulatorStats cachePopulatorStats;
private final long maxEntrySize;
@@ -61,7 +64,7 @@ public class ForegroundCachePopulator implements
CachePopulator
)
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- final AtomicBoolean tooBig = new AtomicBoolean(false);
+ final MutableBoolean tooBig = new MutableBoolean(false);
final JsonGenerator jsonGenerator;
try {
@@ -75,21 +78,19 @@ public class ForegroundCachePopulator implements
CachePopulator
Sequences.map(
sequence,
input -> {
- if (!tooBig.get()) {
- synchronized (lock) {
- try {
- jsonGenerator.writeObject(cacheFn.apply(input));
+ if (!tooBig.isTrue()) {
+ try {
+ jsonGenerator.writeObject(cacheFn.apply(input));
- // Not flushing jsonGenerator before checking this, but
should be ok since Jackson buffers are
- // typically just a few KB, and we don't want to waste
cycles flushing.
- if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
- tooBig.set(true);
- }
- }
- catch (IOException e) {
- throw new RuntimeException(e);
+ // Not flushing jsonGenerator before checking this, but
should be ok since Jackson buffers are
+ // typically just a few KB, and we don't want to waste
cycles flushing.
+ if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
+ tooBig.setValue(true);
}
}
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
return input;
@@ -100,24 +101,22 @@ public class ForegroundCachePopulator implements
CachePopulator
@Override
public void after(final boolean isDone, final Throwable thrown)
throws Exception
{
- synchronized (lock) {
- jsonGenerator.close();
+ jsonGenerator.close();
- if (isDone) {
- // Check tooBig, then check maxEntrySize one more time, after
closing/flushing jsonGenerator.
- if (tooBig.get() || (maxEntrySize > 0 && bytes.size() >
maxEntrySize)) {
- cachePopulatorStats.incrementOversized();
- return;
- }
+ if (isDone) {
+ // Check tooBig, then check maxEntrySize one more time, after
closing/flushing jsonGenerator.
+ if (tooBig.isTrue() || (maxEntrySize > 0 && bytes.size() >
maxEntrySize)) {
+ cachePopulatorStats.incrementOversized();
+ return;
+ }
- try {
- cache.put(cacheKey, bytes.toByteArray());
- cachePopulatorStats.incrementOk();
- }
- catch (Exception e) {
- log.warn(e, "Unable to write to cache");
- cachePopulatorStats.incrementError();
- }
+ try {
+ cache.put(cacheKey, bytes.toByteArray());
+ cachePopulatorStats.incrementOk();
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to write to cache");
+ cachePopulatorStats.incrementError();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]