This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ddbaa8744 Reserve threads for non-query requests without using laning 
(#14576)
1ddbaa8744 is described below

commit 1ddbaa874429d6f176c90f25189c959d5d1eae55
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Thu Jul 20 15:03:48 2023 +0530

    Reserve threads for non-query requests without using laning (#14576)
    
    This PR uses the QoSFilter available in Jetty to park the query requests 
that exceed a configured limit. This is done so that other HTTP requests such 
as health check calls do not get blocked if the query server is busy serving 
long-running queries. The same mechanism can also be used in the future to 
isolate interactive queries from long-running select queries from interactive 
queries within the same broker.
    
    Right now, you can still get that isolation by setting 
druid.query.scheduler.numThreads to a value lowe than 
druid.server.http.numThreads. That enables total laning but the side effect is 
that excess requests are not queued and rejected outright that leads to a bad 
user experience.
    
    Parked requests are timed out after 30 seconds by default. I overrode that 
to the maxQueryTimeout in this PR.
---
 .../org/apache/druid/server/QueryScheduler.java    |  7 ++++--
 .../druid/server/initialization/ServerConfig.java  | 26 +++++++++++++++++--
 .../server/initialization/jetty/JettyBindings.java | 13 +++++++++-
 .../apache/druid/server/QuerySchedulerTest.java    | 26 +++++++++++++++++++
 .../druid/cli/QueryJettyServerInitializer.java     | 29 +++++++++++++++++++++-
 5 files changed, 95 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java 
b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index fb44af0e7b..4dba34a6be 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -106,9 +106,12 @@ public class QueryScheduler implements QueryWatcher
     this.laningStrategy = laningStrategy;
     this.queryFutures = 
Multimaps.synchronizedSetMultimap(HashMultimap.create());
     this.queryDatasources = 
Multimaps.synchronizedSetMultimap(HashMultimap.create());
-    // if totalNumThreads is above 0 and less than 
druid.server.http.numThreads, enforce total limit
+    // if totalNumThreads is above 0 and less than 
druid.server.http.numThreads and
+    // requests are not being queued by Jetty, enforce total limit
     final boolean limitTotal;
-    if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads()) 
{
+    if (totalNumThreads > 0
+        && totalNumThreads < serverConfig.getNumThreads()
+        && !serverConfig.isEnableQueryRequestsQueuing()) {
       limitTotal = true;
       this.totalCapacity = totalNumThreads;
     } else {
diff --git 
a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java 
b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
index 9b72670c3f..276a0030af 100644
--- 
a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.initialization;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.exception.ErrorResponseTransformStrategy;
 import org.apache.druid.common.exception.NoErrorResponseTransformStrategy;
@@ -104,6 +105,12 @@ public class ServerConfig
 
   }
 
+  @VisibleForTesting
+  public ServerConfig(boolean enableQueryRequestsQueuing)
+  {
+    this.enableQueryRequestsQueuing = enableQueryRequestsQueuing;
+  }
+
   @JsonProperty
   @Min(1)
   private int numThreads = getDefaultNumThreads();
@@ -179,6 +186,13 @@ public class ServerConfig
   @JsonProperty
   private boolean enableHSTS = false;
 
+  /**
+   * This is a feature flag to enable query requests queuing when admins want 
to reserve some threads for
+   * non-query requests. This feature flag is not documented and can be 
removed in the future.
+   */
+  @JsonProperty
+  private boolean enableQueryRequestsQueuing = false;
+
   @JsonProperty
   private boolean showDetailedJettyErrors = true;
 
@@ -288,6 +302,11 @@ public class ServerConfig
     return enableHSTS;
   }
 
+  public boolean isEnableQueryRequestsQueuing()
+  {
+    return enableQueryRequestsQueuing;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -318,7 +337,8 @@ public class ServerConfig
            allowedHttpMethods.equals(that.allowedHttpMethods) &&
            
errorResponseTransformStrategy.equals(that.errorResponseTransformStrategy) &&
            Objects.equals(contentSecurityPolicy, 
that.getContentSecurityPolicy()) &&
-           enableHSTS == that.enableHSTS;
+           enableHSTS == that.enableHSTS &&
+           enableQueryRequestsQueuing == that.enableQueryRequestsQueuing;
   }
 
   @Override
@@ -345,7 +365,8 @@ public class ServerConfig
         errorResponseTransformStrategy,
         showDetailedJettyErrors,
         contentSecurityPolicy,
-        enableHSTS
+        enableHSTS,
+        enableQueryRequestsQueuing
     );
   }
 
@@ -374,6 +395,7 @@ public class ServerConfig
            ", showDetailedJettyErrors=" + showDetailedJettyErrors +
            ", contentSecurityPolicy=" + contentSecurityPolicy +
            ", enableHSTS=" + enableHSTS +
+           ", enableQueryRequestsQueuing=" + enableQueryRequestsQueuing +
            '}';
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
index 835564dc0b..9fc26a73be 100644
--- 
a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
+++ 
b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
@@ -65,10 +65,18 @@ public class JettyBindings
     private final String[] paths;
     private final int maxRequests;
 
-    public QosFilterHolder(String[] paths, int maxRequests)
+    private final long timeoutMs;
+
+    public QosFilterHolder(String[] paths, int maxRequests, long timeoutMs)
     {
       this.paths = paths;
       this.maxRequests = maxRequests;
+      this.timeoutMs = timeoutMs;
+    }
+
+    public QosFilterHolder(String[] paths, int maxRequests)
+    {
+      this(paths, maxRequests, -1);
     }
 
     @Override
@@ -86,6 +94,9 @@ public class JettyBindings
     @Override
     public Map<String, String> getInitParameters()
     {
+      if (timeoutMs >= 0) {
+        return ImmutableMap.of("maxRequests", String.valueOf(maxRequests), 
"suspendMs", String.valueOf(timeoutMs));
+      }
       return ImmutableMap.of("maxRequests", String.valueOf(maxRequests));
     }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java 
b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 749317d030..eb8025a770 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -348,6 +348,32 @@ public class QuerySchedulerTest
     getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
   }
 
+  @Test
+  public void testTotalLimitWithQueryQueuing()
+  {
+    ServerConfig serverConfig = new ServerConfig();
+    QueryScheduler queryScheduler = new QueryScheduler(
+        serverConfig.getNumThreads() - 1,
+        ManualQueryPrioritizationStrategy.INSTANCE,
+        new NoQueryLaningStrategy(),
+        serverConfig
+    );
+    Assert.assertEquals(serverConfig.getNumThreads() - 1, 
queryScheduler.getTotalAvailableCapacity());
+  }
+
+  @Test
+  public void testTotalLimitWithouQueryQueuing()
+  {
+    ServerConfig serverConfig = new ServerConfig(true);
+    QueryScheduler queryScheduler = new QueryScheduler(
+        serverConfig.getNumThreads() - 1,
+        ManualQueryPrioritizationStrategy.INSTANCE,
+        new NoQueryLaningStrategy(),
+        serverConfig
+    );
+    Assert.assertEquals(-1, queryScheduler.getTotalAvailableCapacity());
+  }
+
   @Test
   public void testExplodingWrapperDoesNotLeakLocks()
   {
diff --git 
a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java 
b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
index fb0dff1561..e7c317c241 100644
--- 
a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
+++ 
b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java
@@ -27,9 +27,12 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.servlet.GuiceFilter;
+import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.QuerySchedulerProvider;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.initialization.jetty.JettyBindings;
 import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.initialization.jetty.LimitRequestsFilter;
@@ -46,6 +49,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -67,12 +71,20 @@ public class QueryJettyServerInitializer implements 
JettyServerInitializer
 
   private final AuthConfig authConfig;
 
+  private final QuerySchedulerProvider querySchedulerConfig;
+
   @Inject
-  public QueryJettyServerInitializer(Set<Handler> extensionHandlers, 
ServerConfig serverConfig, AuthConfig authConfig)
+  public QueryJettyServerInitializer(
+      Set<Handler> extensionHandlers,
+      ServerConfig serverConfig,
+      AuthConfig authConfig,
+      @Global QuerySchedulerProvider querySchedulerConfig
+  )
   {
     this.extensionHandlers = ImmutableList.copyOf(extensionHandlers);
     this.serverConfig = serverConfig;
     this.authConfig = authConfig;
+    this.querySchedulerConfig = querySchedulerConfig;
   }
 
   @Override
@@ -95,6 +107,21 @@ public class QueryJettyServerInitializer implements 
JettyServerInitializer
       );
     }
 
+    if (querySchedulerConfig.getNumThreads() > 0
+        && querySchedulerConfig.getNumThreads() < serverConfig.getNumThreads()
+        && serverConfig.isEnableQueryRequestsQueuing()) {
+      // Add QoS filter for query requests, so they don't take up more than 
querySchedulerConfig#numThreads.
+      // While this will also pick up some extra endpoints other than Query, 
the primary objective is to protect
+      // health check endpoints from being starved by query requests.
+      log.info("Enabling QoS Filter on query requests with limit [%d].", 
querySchedulerConfig.getNumThreads());
+      JettyBindings.QosFilterHolder filterHolder = new 
JettyBindings.QosFilterHolder(
+          new String[]{"/druid/v2/*"},
+          querySchedulerConfig.getNumThreads(),
+          serverConfig.getMaxQueryTimeout()
+      );
+      JettyServerInitUtils.addFilters(root, 
Collections.singleton(filterHolder));
+    }
+
     final ObjectMapper jsonMapper = 
injector.getInstance(Key.get(ObjectMapper.class, Json.class));
     final AuthenticatorMapper authenticatorMapper = 
injector.getInstance(AuthenticatorMapper.class);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to