This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c267b65 Removing unused processing threadpool on broker (#12070)
c267b65 is described below
commit c267b65f97998f7e6b43b102a0e16c331825fb14
Author: somu-imply <[email protected]>
AuthorDate: Tue Dec 21 13:07:53 2021 -0800
Removing unused processing threadpool on broker (#12070)
* Thread pool for broker
* Updating two tests to improve coverage for new method added
* Updating druidProcessingConfigTest to cover coverage
* Adding missed spelling errors caused in doc
* Adding test to cover lines of new function added
---
docs/configuration/index.md | 1 +
.../apache/druid/query/DruidProcessingConfig.java | 10 ++
.../druid/query/DruidProcessingConfigTest.java | 2 +
.../apache/druid/guice/BrokerProcessingModule.java | 185 +++++++++++++++++++++
.../druid/guice/BrokerProcessingModuleTest.java | 149 +++++++++++++++++
.../druid/guice/DruidProcessingModuleTest.java | 1 +
.../main/java/org/apache/druid/cli/CliBroker.java | 4 +-
website/.spelling | 5 +
8 files changed, 355 insertions(+), 2 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4178d01..336f950 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1620,6 +1620,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size (less than
2GiB), for the storage of intermediate results. The computation engine in both
the Historical and Realtime processes will use a scratch buffer of this size to
do all of their intermediate computations off-heap. Larger values allow for
more aggregations in a single pass over the data while smaller values can
require more passes depending on the query that is being executed.
[Human-readable format](human-readable [...]
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the
buffers for later use, this is the maximum count cache will grow to. note that
pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
+|`druid.processing.buffer.poolCacheInitialCount`|initializes the number of
buffers allocated on the intermediate results pool. Note that pool can create
more buffers if necessary.|`0`|
|`druid.processing.formatString`|Realtime and Historical processes use this
format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers
available for merging query results. The buffers are sized by
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency
limit for queries that require merging buffers. If you are using any queries
that require merge buffers (currently, just groupBy v2) then you should have at
least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have
available for parallel processing of segments. Our rule of thumb is `num_cores
- 1`, which means that even under heavy load there will still be one core
available to do background tasks like talking with ZooKeeper and pulling down
segments. If only one core is available, this property defaults to the value
`1`.|Number of cores - 1 (or 1)|
diff --git
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index 4968e49..01b02e7 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -38,6 +38,7 @@ public abstract class DruidProcessingConfig extends
ExecutorServiceConfig implem
public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES
= HumanReadableBytes.valueOf(-1);
public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 *
1024 * 1024;
public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
+ public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;
private AtomicReference<Integer> computedBufferSizeBytes = new
AtomicReference<>();
@@ -104,6 +105,15 @@ public abstract class DruidProcessingConfig extends
ExecutorServiceConfig implem
return Integer.MAX_VALUE;
}
+ @Config({
+ "druid.computation.buffer.poolCacheInitialCount",
+ "${base_path}.buffer.poolCacheInitialCount"
+ })
+ public int getNumInitalBuffersForIntermediatePool()
+ {
+ return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL;
+ }
+
@Override
@Config(value = "${base_path}.numThreads")
public int getNumThreadsConfigured()
diff --git
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 3afdb07..c5888b7 100644
---
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -135,6 +135,7 @@ public class DruidProcessingConfigTest
props.setProperty("druid.processing.fifo", "true");
props.setProperty("druid.processing.tmpDir", "/test/path");
+
Injector injector = makeInjector(
NUM_PROCESSORS,
DIRECT_SIZE,
@@ -151,6 +152,7 @@ public class DruidProcessingConfigTest
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
Assert.assertEquals("/test/path", config.getTmpDir());
+ Assert.assertEquals(0, config.getNumInitalBuffersForIntermediatePool());
}
@Test
diff --git
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
new file mode 100644
index 0000000..9b868f5
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.ProvisionException;
+import org.apache.druid.client.cache.BackgroundCachePopulator;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DefaultBlockingPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.collections.StupidPool;
+import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.guice.annotations.Merging;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.offheap.OffheapBufferGenerator;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.server.metrics.MetricsModule;
+import org.apache.druid.utils.JvmUtils;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * This module is used to fulfill dependency injection of query processing and
caching resources: buffer pools and
+ * thread pools on Broker. Broker does not need to be allocated an
intermediate results pool.
+ * This is separated from DruidProcessingModule to separate the needs of the
broker from the historicals
+ */
+
+public class BrokerProcessingModule implements Module
+{
+ private static final Logger log = new Logger(BrokerProcessingModule.class);
+
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+ MetricsModule.register(binder, ExecutorServiceMonitor.class);
+ }
+
+ @Provides
+ @LazySingleton
+ public CachePopulator getCachePopulator(
+ @Smile ObjectMapper smileMapper,
+ CachePopulatorStats cachePopulatorStats,
+ CacheConfig cacheConfig
+ )
+ {
+ if (cacheConfig.getNumBackgroundThreads() > 0) {
+ final ExecutorService exec = Executors.newFixedThreadPool(
+ cacheConfig.getNumBackgroundThreads(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("background-cacher-%d")
+ .setDaemon(true)
+ .setPriority(Thread.MIN_PRIORITY)
+ .build()
+ );
+
+ return new BackgroundCachePopulator(exec, smileMapper,
cachePopulatorStats, cacheConfig.getMaxEntrySize());
+ } else {
+ return new ForegroundCachePopulator(smileMapper, cachePopulatorStats,
cacheConfig.getMaxEntrySize());
+ }
+ }
+
+ @Provides
+ @ManageLifecycle
+ public QueryProcessingPool getProcessingExecutorPool(
+ DruidProcessingConfig config
+ )
+ {
+ return new ForwardingQueryProcessingPool(Execs.dummy());
+ }
+
+ @Provides
+ @LazySingleton
+ @Global
+ public NonBlockingPool<ByteBuffer>
getIntermediateResultsPool(DruidProcessingConfig config)
+ {
+ verifyDirectMemory(config);
+
+ return new StupidPool<>(
+ "intermediate processing pool",
+ new OffheapBufferGenerator("intermediate processing",
config.intermediateComputeSizeBytes()),
+ config.getNumInitalBuffersForIntermediatePool(),
+ config.poolCacheMaxCount()
+ );
+ }
+
+ @Provides
+ @LazySingleton
+ @Merging
+ public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig
config)
+ {
+ verifyDirectMemory(config);
+ return new DefaultBlockingPool<>(
+ new OffheapBufferGenerator("result merging",
config.intermediateComputeSizeBytes()),
+ config.getNumMergeBuffers()
+ );
+ }
+
+ @Provides
+ @ManageLifecycle
+ public LifecycleForkJoinPoolProvider
getMergeProcessingPoolProvider(DruidProcessingConfig config)
+ {
+ return new LifecycleForkJoinPoolProvider(
+ config.getMergePoolParallelism(),
+ ForkJoinPool.defaultForkJoinWorkerThreadFactory,
+ (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
+ true,
+ config.getMergePoolAwaitShutdownMillis()
+ );
+ }
+
+ @Provides
+ @Merging
+ public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider
poolProvider)
+ {
+ return poolProvider.getPool();
+ }
+
+ private void verifyDirectMemory(DruidProcessingConfig config)
+ {
+ try {
+ final long maxDirectMemory =
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+ final long memoryNeeded = (long) config.intermediateComputeSizeBytes() *
+ (config.getNumMergeBuffers() + 1);
+
+ if (maxDirectMemory < memoryNeeded) {
+ throw new ProvisionException(
+ StringUtils.format(
+ "Not enough direct memory. Please adjust
-XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes,
druid.processing.numThreads, or druid.processing.numMergeBuffers: "
+ + "maxDirectMemory[%,d], memoryNeeded[%,d] =
druid.processing.buffer.sizeBytes[%,d] * (druid.processing.numMergeBuffers[%,d]
+ 1)",
+ maxDirectMemory,
+ memoryNeeded,
+ config.intermediateComputeSizeBytes(),
+ config.getNumMergeBuffers()
+ )
+ );
+ }
+ }
+ catch (UnsupportedOperationException e) {
+ log.debug("Checking for direct memory size is not support on this
platform: %s", e);
+ log.info(
+ "Unable to determine max direct memory size. If
druid.processing.buffer.sizeBytes is explicitly configured, "
+ + "then make sure to set -XX:MaxDirectMemorySize to at least
\"druid.processing.buffer.sizeBytes * "
+ + "(druid.processing.numMergeBuffers[%,d] + 1)\", "
+ + "or else set -XX:MaxDirectMemorySize to at least 25%% of maximum
jvm heap size.",
+ config.getNumMergeBuffers()
+ );
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
new file mode 100644
index 0000000..3ce5db1
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
+import com.google.inject.name.Names;
+import com.google.inject.util.Modules;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.initialization.Initialization;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.utils.JvmUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerProcessingModuleTest
+{
+ private static final boolean INJECT_SERVER_TYPE_CONFIG = true;
+ @Mock
+ private DruidProcessingConfig druidProcessingConfig;
+ private Injector injector;
+ private BrokerProcessingModule target;
+ @Mock
+ private CacheConfig cacheConfig;
+ @Mock
+ private CachePopulatorStats cachePopulatorStats;
+
+ @Before
+ public void setUp()
+ {
+ target = new BrokerProcessingModule();
+ injector = makeInjector(INJECT_SERVER_TYPE_CONFIG);
+ }
+
+ @Test
+ public void testIntermediateResultsPool()
+ {
+ target.getIntermediateResultsPool(druidProcessingConfig);
+ }
+
+
+ @Test
+ public void testMergeBufferPool()
+ {
+ target.getMergeBufferPool(druidProcessingConfig);
+ }
+
+ @Test
+ public void testMergeProcessingPool()
+ {
+ DruidProcessingConfig config = new DruidProcessingConfig()
+ {
+ @Override
+ public String getFormatString()
+ {
+ return "processing-test-%s";
+ }
+ };
+ DruidProcessingModule module = new DruidProcessingModule();
+ module.getMergeProcessingPoolProvider(config);
+ config.getNumInitalBuffersForIntermediatePool();
+ }
+
+ @Test
+ public void testCachePopulatorAsSingleton()
+ {
+ CachePopulator cachePopulator = injector.getInstance(CachePopulator.class);
+ Assert.assertNotNull(cachePopulator);
+
+ }
+
+ @Test(expected = ProvisionException.class)
+ public void testMemoryCheckThrowsException()
+ {
+ // JDK 9 and above do not support checking for direct memory size
+ // so this test only validates functionality for Java 8.
+ try {
+ JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
+ }
+ catch (UnsupportedOperationException e) {
+ Assume.assumeNoException(e);
+ }
+
+ BrokerProcessingModule module = new BrokerProcessingModule();
+ module.getMergeBufferPool(new DruidProcessingConfig()
+ {
+ @Override
+ public String getFormatString()
+ {
+ return "test";
+ }
+
+ @Override
+ public int intermediateComputeSizeBytes()
+ {
+ return Integer.MAX_VALUE;
+ }
+ });
+ }
+
+ private Injector makeInjector(boolean withServerTypeConfig)
+ {
+ return Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
(ImmutableList.of(Modules.override(
+ (binder) -> {
+
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
+
binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
+ },
+ target
+ ).with(
+ (binder) -> {
+
binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats);
+ binder.bind(CacheConfig.class).toInstance(cacheConfig);
+ }
+ )
+ )));
+ }
+
+}
+
diff --git
a/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
b/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
index 05f309d..129522d 100644
--- a/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/DruidProcessingModuleTest.java
@@ -70,6 +70,7 @@ public class DruidProcessingModuleTest
};
DruidProcessingModule module = new DruidProcessingModule();
+ config.getNumInitalBuffersForIntermediatePool();
module.getIntermediateResultsPool(config);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 3a6d4df..fff0280 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -38,9 +38,9 @@ import
org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.BrokerProcessingModule;
import org.apache.druid.guice.BrokerServiceModule;
import org.apache.druid.guice.CacheModule;
-import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
@@ -108,7 +108,7 @@ public class CliBroker extends ServerRunnable
protected List<? extends Module> getModules()
{
return ImmutableList.of(
- new DruidProcessingModule(),
+ new BrokerProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
diff --git a/website/.spelling b/website/.spelling
index 33cd648..54fb7d0 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1542,6 +1542,11 @@ skipEmptyBuckets
useCache
useResultLevelCache
vectorSize
+enableJoinLeftTableScanDirect
+enableJoinFilterPushDown
+enableJoinFilterRewrite
+enableJoinFilterRewriteValueColumnFilters
+joinFilterRewriteMaxSize
- ../docs/querying/querying.md
7KiB
DatasourceMetadata
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]