This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1ddbaa8744 Reserve threads for non-query requests without using laning
(#14576)
1ddbaa8744 is described below
commit 1ddbaa874429d6f176c90f25189c959d5d1eae55
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Thu Jul 20 15:03:48 2023 +0530
Reserve threads for non-query requests without using laning (#14576)
This PR uses the QoSFilter available in Jetty to park the query requests
that exceed a configured limit. This is done so that other HTTP requests such
as health check calls do not get blocked if the query server is busy serving
long-running queries. The same mechanism can also be used in the future to
isolate interactive queries from long-running select queries from interactive
queries within the same broker.
Right now, you can still get that isolation by setting
druid.query.scheduler.numThreads to a value lowe than
druid.server.http.numThreads. That enables total laning but the side effect is
that excess requests are not queued and rejected outright that leads to a bad
user experience.
Parked requests are timed out after 30 seconds by default. I overrode that
to the maxQueryTimeout in this PR.
---
.../org/apache/druid/server/QueryScheduler.java | 7 ++++--
.../druid/server/initialization/ServerConfig.java | 26 +++++++++++++++++--
.../server/initialization/jetty/JettyBindings.java | 13 +++++++++-
.../apache/druid/server/QuerySchedulerTest.java | 26 +++++++++++++++++++
.../druid/cli/QueryJettyServerInitializer.java | 29 +++++++++++++++++++++-
5 files changed, 95 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index fb44af0e7b..4dba34a6be 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -106,9 +106,12 @@ public class QueryScheduler implements QueryWatcher
this.laningStrategy = laningStrategy;
this.queryFutures =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
this.queryDatasources =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
- // if totalNumThreads is above 0 and less than
druid.server.http.numThreads, enforce total limit
+ // if totalNumThreads is above 0 and less than
druid.server.http.numThreads and
+ // requests are not being queued by Jetty, enforce total limit
final boolean limitTotal;
- if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads())
{
+ if (totalNumThreads > 0
+ && totalNumThreads < serverConfig.getNumThreads()
+ && !serverConfig.isEnableQueryRequestsQueuing()) {
limitTotal = true;
this.totalCapacity = totalNumThreads;
} else {
diff --git
a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
index 9b72670c3f..276a0030af 100644
---
a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
+++
b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.initialization;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.exception.ErrorResponseTransformStrategy;
import org.apache.druid.common.exception.NoErrorResponseTransformStrategy;
@@ -104,6 +105,12 @@ public class ServerConfig
}
+ @VisibleForTesting
+ public ServerConfig(boolean enableQueryRequestsQueuing)
+ {
+ this.enableQueryRequestsQueuing = enableQueryRequestsQueuing;
+ }
+
@JsonProperty
@Min(1)
private int numThreads = getDefaultNumThreads();
@@ -179,6 +186,13 @@ public class ServerConfig
@JsonProperty
private boolean enableHSTS = false;
+ /**
+ * This is a feature flag to enable query requests queuing when admins want
to reserve some threads for
+ * non-query requests. This feature flag is not documented and can be
removed in the future.
+ */
+ @JsonProperty
+ private boolean enableQueryRequestsQueuing = false;
+
@JsonProperty
private boolean showDetailedJettyErrors = true;
@@ -288,6 +302,11 @@ public class ServerConfig
return enableHSTS;
}
+ public boolean isEnableQueryRequestsQueuing()
+ {
+ return enableQueryRequestsQueuing;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -318,7 +337,8 @@ public class ServerConfig
allowedHttpMethods.equals(that.allowedHttpMethods) &&
errorResponseTransformStrategy.equals(that.errorResponseTransformStrategy) &&
Objects.equals(contentSecurityPolicy,
that.getContentSecurityPolicy()) &&
- enableHSTS == that.enableHSTS;
+ enableHSTS == that.enableHSTS &&
+ enableQueryRequestsQueuing == that.enableQueryRequestsQueuing;
}
@Override
@@ -345,7 +365,8 @@ public class ServerConfig
errorResponseTransformStrategy,
showDetailedJettyErrors,
contentSecurityPolicy,
- enableHSTS
+ enableHSTS,
+ enableQueryRequestsQueuing
);
}
@@ -374,6 +395,7 @@ public class ServerConfig
", showDetailedJettyErrors=" + showDetailedJettyErrors +
", contentSecurityPolicy=" + contentSecurityPolicy +
", enableHSTS=" + enableHSTS +
+ ", enableQueryRequestsQueuing=" + enableQueryRequestsQueuing +
'}';
}
diff --git
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
index 835564dc0b..9fc26a73be 100644
---
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
+++
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
@@ -65,10 +65,18 @@ public class JettyBindings
private final String[] paths;
private final int maxRequests;
- public QosFilterHolder(String[] paths, int maxRequests)
+ private final long timeoutMs;
+
+ public QosFilterHolder(String[] paths, int maxRequests, long timeoutMs)
{
this.paths = paths;
this.maxRequests = maxRequests;
+ this.timeoutMs = timeoutMs;
+ }
+
+ public QosFilterHolder(String[] paths, int maxRequests)
+ {
+ this(paths, maxRequests, -1);
}
@Override
@@ -86,6 +94,9 @@ public class JettyBindings
@Override
public Map<String, String> getInitParameters()
{
+ if (timeoutMs >= 0) {
+ return ImmutableMap.of("maxRequests", String.valueOf(maxRequests),
"suspendMs", String.valueOf(timeoutMs));
+ }
return ImmutableMap.of("maxRequests", String.valueOf(maxRequests));
}
diff --git
a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 749317d030..eb8025a770 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -348,6 +348,32 @@ public class QuerySchedulerTest
getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
}
+ @Test
+ public void testTotalLimitWithQueryQueuing()
+ {
+ ServerConfig serverConfig = new ServerConfig();
+ QueryScheduler queryScheduler = new QueryScheduler(
+ serverConfig.getNumThreads() - 1,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ new NoQueryLaningStrategy(),
+ serverConfig
+ );
+ Assert.assertEquals(serverConfig.getNumThreads() - 1,
queryScheduler.getTotalAvailableCapacity());
+ }
+
+ @Test
+ public void testTotalLimitWithouQueryQueuing()
+ {
+ ServerConfig serverConfig = new ServerConfig(true);
+ QueryScheduler queryScheduler = new QueryScheduler(
+ serverConfig.getNumThreads() - 1,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ new NoQueryLaningStrategy(),
+ serverConfig
+ );
+ Assert.assertEquals(-1, queryScheduler.getTotalAvailableCapacity());
+ }
+
@Test
public void testExplodingWrapperDoesNotLeakLocks()
{
diff --git
a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
index fb0dff1561..e7c317c241 100644
---
a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
+++
b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
@@ -27,9 +27,12 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.servlet.GuiceFilter;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.QuerySchedulerProvider;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.initialization.jetty.LimitRequestsFilter;
@@ -46,6 +49,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -67,12 +71,20 @@ public class QueryJettyServerInitializer implements
JettyServerInitializer
private final AuthConfig authConfig;
+ private final QuerySchedulerProvider querySchedulerConfig;
+
@Inject
- public QueryJettyServerInitializer(Set<Handler> extensionHandlers,
ServerConfig serverConfig, AuthConfig authConfig)
+ public QueryJettyServerInitializer(
+ Set<Handler> extensionHandlers,
+ ServerConfig serverConfig,
+ AuthConfig authConfig,
+ @Global QuerySchedulerProvider querySchedulerConfig
+ )
{
this.extensionHandlers = ImmutableList.copyOf(extensionHandlers);
this.serverConfig = serverConfig;
this.authConfig = authConfig;
+ this.querySchedulerConfig = querySchedulerConfig;
}
@Override
@@ -95,6 +107,21 @@ public class QueryJettyServerInitializer implements
JettyServerInitializer
);
}
+ if (querySchedulerConfig.getNumThreads() > 0
+ && querySchedulerConfig.getNumThreads() < serverConfig.getNumThreads()
+ && serverConfig.isEnableQueryRequestsQueuing()) {
+ // Add QoS filter for query requests, so they don't take up more than
querySchedulerConfig#numThreads.
+ // While this will also pick up some extra endpoints other than Query,
the primary objective is to protect
+ // health check endpoints from being starved by query requests.
+ log.info("Enabling QoS Filter on query requests with limit [%d].",
querySchedulerConfig.getNumThreads());
+ JettyBindings.QosFilterHolder filterHolder = new
JettyBindings.QosFilterHolder(
+ new String[]{"/druid/v2/*"},
+ querySchedulerConfig.getNumThreads(),
+ serverConfig.getMaxQueryTimeout()
+ );
+ JettyServerInitUtils.addFilters(root,
Collections.singleton(filterHolder));
+ }
+
final ObjectMapper jsonMapper =
injector.getInstance(Key.get(ObjectMapper.class, Json.class));
final AuthenticatorMapper authenticatorMapper =
injector.getInstance(AuthenticatorMapper.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]