kfaraz commented on code in PR #18033:
URL: https://github.com/apache/druid/pull/18033#discussion_r2106735032
##########
server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java:
##########
@@ -46,6 +46,7 @@
public class ServerConfig
{
public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_NUM_PACKING_THREADS = 30;
Review Comment:
What does "packing" signify here?
##########
server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java:
##########
@@ -196,24 +196,23 @@ static Server makeAndInitializeServer(
// that concurrently handle the requests".
int numServerThreads = config.getNumThreads() +
getMaxJettyAcceptorsSelectorsNum(node);
- final QueuedThreadPool threadPool;
if (config.getQueueSize() == Integer.MAX_VALUE) {
- threadPool = new QueuedThreadPool();
- threadPool.setMinThreads(numServerThreads);
- threadPool.setMaxThreads(numServerThreads);
+ jettyServerThreadPool = new QueuedThreadPool();
+ jettyServerThreadPool.setMinThreads(numServerThreads);
+ jettyServerThreadPool.setMaxThreads(numServerThreads);
} else {
- threadPool = new QueuedThreadPool(
+ jettyServerThreadPool = new QueuedThreadPool(
numServerThreads,
numServerThreads,
60000, // same default is used in other case when threadPool = new
QueuedThreadPool()
new LinkedBlockingQueue<>(config.getQueueSize())
);
}
- threadPool.setDaemon(true);
- jettyServerThreadPool = threadPool;
+ jettyServerThreadPool.setDaemon(true);
Review Comment:
Original code was more appropriate where `jettyServerThreadPool` was
assigned only after the value `threadPool` was fully baked.
Otherwise, the `doMonitor` method might emit erratic metrics.
The ideal fix here would to be to make `jettyServerThreadPool` non-static
but that is not needed in this PR and it would require other clean up too.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
Review Comment:
Please add a short javadoc.
```suggestion
protected static boolean addQosFiltering(ServletContextHandler root, int
threadsForOvelordWork)
```
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
+ {
+ if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) {
+ log.info("Enabling QOS filter on overlord requests with limit [%d].",
threadsForOvelordWork);
+ JettyBindings.QosFilterHolder filterHolder = new
JettyBindings.QosFilterHolder(
+ new String[]{
+ "/druid-internal/v1/*",
+ "/druid/indexer/v1/*"
+ },
+ threadsForOvelordWork
+ );
+ JettyServerInitUtils.addFilters(root,
Collections.singleton(filterHolder));
Review Comment:
If already bound using `JettyBindings.addQosFilter` in
`addOverlordJerseyResources`, we wouldn't need to call this at all since
`JettyServerInitUtils.addQosFilters` is already being called from
`OverlordJettyServerInitializer.initialize`.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -487,6 +508,13 @@ public void initialize(Server server, Injector injector)
final ObjectMapper jsonMapper =
injector.getInstance(Key.get(ObjectMapper.class, Json.class));
final AuthenticatorMapper authenticatorMapper =
injector.getInstance(AuthenticatorMapper.class);
+ final QueuedThreadPool queuedThreadPool = (QueuedThreadPool)
server.getThreadPool();
+ final int maxThreads = queuedThreadPool.getMaxThreads();
Review Comment:
Might be nice to have the logic to compute the max threads commoned out
somewhere that can be used here. It seems weird to need a handle to the actual
`ThreadPool` object just to determine the max threads.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -487,6 +508,13 @@ public void initialize(Server server, Injector injector)
final ObjectMapper jsonMapper =
injector.getInstance(Key.get(ObjectMapper.class, Json.class));
final AuthenticatorMapper authenticatorMapper =
injector.getInstance(AuthenticatorMapper.class);
+ final QueuedThreadPool queuedThreadPool = (QueuedThreadPool)
server.getThreadPool();
+ final int maxThreads = queuedThreadPool.getMaxThreads();
+ final int threadsForOvelordWork = maxThreads -
THREADS_RESERVED_FOR_HEALTH_CHECK;
Review Comment:
Nit: we can inline this variable in the method call.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -460,9 +460,30 @@ private void configureOverlordHelpers(Binder binder)
);
}
+
/**
+ * Currenlty, the resource paths of the jersery resources on the overlord
start with
+ * <ol>
+ * <li>/druid/indexer/v1</li>
+ * <li>/druid-internal/v1</li>
+ * </ol>
+ * <p>
+ * As QOS filtering is enabled on overlord requests, we need to update the
QOS filter paths in
+ * {@link
org.apache.druid.cli.CliOverlord#addQOSFiltering(ServletContextHandler, int)}
when a new jersey resource is added.
Review Comment:
```suggestion
* Since QoS filtering is enabled for Overlord requests, update the QoS
filter paths in
* {@link #addQosFiltering} whenever a new jersey resource is added here.
```
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
+ {
+ if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) {
+ log.info("Enabling QOS filter on overlord requests with limit [%d].",
threadsForOvelordWork);
+ JettyBindings.QosFilterHolder filterHolder = new
JettyBindings.QosFilterHolder(
+ new String[]{
+ "/druid-internal/v1/*",
+ "/druid/indexer/v1/*"
+ },
+ threadsForOvelordWork
+ );
+ JettyServerInitUtils.addFilters(root,
Collections.singleton(filterHolder));
+ return true;
+ } else {
+ log.info(
+ "QOS filter is disabled for the overlord requests." +
+ "Set `druid.server.http.numThread` to a value greater than %d to
enable QoSFilter.",
+ ServerConfig.DEFAULT_NUM_PACKING_THREADS
Review Comment:
```suggestion
"QoS filtering is disabled for Overlord requests. " +
"Set `druid.server.http.numThreads` to a value greater than [%d]
to enable.",
ServerConfig.DEFAULT_NUM_PACKING_THREADS
```
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
+ {
+ if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) {
+ log.info("Enabling QOS filter on overlord requests with limit [%d].",
threadsForOvelordWork);
Review Comment:
```suggestion
log.info("Enabling QoS filtering for Overlord requests with
limit[%d].", threadsForOvelordWork);
```
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
Review Comment:
Nit: rename the second arg to `processingThreads` or something similar.
```suggestion
protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
```
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -460,9 +460,30 @@ private void configureOverlordHelpers(Binder binder)
);
}
+
/**
+ * Currenlty, the resource paths of the jersery resources on the overlord
start with
+ * <ol>
+ * <li>/druid/indexer/v1</li>
+ * <li>/druid-internal/v1</li>
+ * </ol>
+ * <p>
+ * As QOS filtering is enabled on overlord requests, we need to update the
QOS filter paths in
+ * {@link
org.apache.druid.cli.CliOverlord#addQOSFiltering(ServletContextHandler, int)}
when a new jersey resource is added.
*/
- private static class OverlordJettyServerInitializer implements
JettyServerInitializer
+ private static void addOverlordJerseyResources(Binder binder)
Review Comment:
Going by what other modules such as `LookupModule` are doing, we should bind
the qos filter in this method itself using `JettyBindings.addQosFilter`.
##########
services/src/main/java/org/apache/druid/cli/CliOverlord.java:
##########
@@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector)
server.setHandler(handlerList);
}
}
+
+ protected static boolean addQOSFiltering(ServletContextHandler root, int
threadsForOvelordWork)
+ {
+ if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) {
+ log.info("Enabling QOS filter on overlord requests with limit [%d].",
threadsForOvelordWork);
+ JettyBindings.QosFilterHolder filterHolder = new
JettyBindings.QosFilterHolder(
+ new String[]{
+ "/druid-internal/v1/*",
+ "/druid/indexer/v1/*"
+ },
+ threadsForOvelordWork
+ );
Review Comment:
Other modules seem to use `JettyBindings.addQosFilter` for this purpose.
Please see comment on `addOverlordJerseyResources` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]