This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 820febf38c0 Improved Connection Count server select strategy (#15975)
820febf38c0 is described below

commit 820febf38c062d35876ddeec4c6887bd0ec51d03
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Mon Mar 4 15:02:32 2024 +0530

    Improved Connection Count server select strategy (#15975)
    
    Updated the Direct Druid Client so as to make Connection Count Server 
Selector Strategy work more efficiently.
    If creating connection to a node is slow, then that slowness wouldn't be 
accounted for if we count the open connections after sending the request. So we 
increment the counter and then send the request.
---
 .../org/apache/druid/client/DirectDruidClient.java | 35 +++++++++-------
 .../apache/druid/client/DirectDruidClientTest.java | 48 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java 
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 9a93c8bacd0..6a502110058 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -455,22 +455,29 @@ public class DirectDruidClient<T> implements 
QueryRunner<T>
         throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] 
url[%s] timed out.", query.getId(), url));
       }
 
-      future = httpClient.go(
-          new Request(
-              HttpMethod.POST,
-              new URL(url)
-          
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, 
timeLeft)))
-           .setHeader(
-               HttpHeaders.Names.CONTENT_TYPE,
-               isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : 
MediaType.APPLICATION_JSON
-           ),
-          responseHandler,
-          Duration.millis(timeLeft)
-      );
+      // increment is moved up so that if future initialization is queued by 
some other process,
+      // we can increment the count earlier so that we can route the request 
to a different server
+      openConnections.getAndIncrement();
+      try {
+        future = httpClient.go(
+            new Request(
+                HttpMethod.POST,
+                new URL(url)
+            
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, 
timeLeft)))
+             .setHeader(
+                 HttpHeaders.Names.CONTENT_TYPE,
+                 isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : 
MediaType.APPLICATION_JSON
+             ),
+            responseHandler,
+            Duration.millis(timeLeft)
+        );
+      }
+      catch (Exception e) {
+        openConnections.getAndDecrement();
+        throw e;
+      }
 
       queryWatcher.registerQueryFuture(query, future);
-
-      openConnections.getAndIncrement();
       Futures.addCallback(
           future,
           new FutureCallback<InputStream>()
diff --git 
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java 
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index 4c99d2b5409..4fffcd6fd35 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.client;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -27,6 +29,7 @@ import 
org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
 import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import org.apache.druid.client.selector.QueryableDruidServer;
 import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -38,6 +41,7 @@ import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunnerTestHelper;
@@ -97,6 +101,7 @@ public class DirectDruidClientTest
   @Before
   public void setup()
   {
+    NullHandling.initializeForTests();
     httpClient = EasyMock.createMock(HttpClient.class);
     serverSelector = new ServerSelector(
         dataSegment,
@@ -427,4 +432,47 @@ public class DirectDruidClientTest
     Assert.assertEquals(hostName, actualException.getHost());
     EasyMock.verify(httpClient);
   }
+
+  @Test
+  public void testConnectionCountAfterException() throws 
JsonProcessingException
+  {
+    ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
+    EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
+            .andThrow(new JsonProcessingException("Error")
+            {
+            });
+
+    DirectDruidClient client2 = new DirectDruidClient(
+        new ReflectionQueryToolChestWarehouse(),
+        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+        mockObjectMapper,
+        httpClient,
+        "http",
+        hostName,
+        new NoopServiceEmitter(),
+        queryCancellationExecutor
+    );
+
+    QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
+        new DruidServer(
+            "test1",
+            "localhost",
+            null,
+            0,
+            ServerType.HISTORICAL,
+            DruidServer.DEFAULT_TIER,
+            0
+        ),
+        client2
+    );
+
+    serverSelector.addServerAndUpdateSegment(queryableDruidServer2, 
serverSelector.getSegment());
+
+    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
+    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
+
+    TimeBoundaryQuery finalQuery = query;
+    Assert.assertThrows(RuntimeException.class, () -> 
client2.run(QueryPlus.wrap(finalQuery)));
+    Assert.assertEquals(0, client2.getNumOpenConnections());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to