This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 820febf38c0 Improved Connection Count server select strategy (#15975)
820febf38c0 is described below
commit 820febf38c062d35876ddeec4c6887bd0ec51d03
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Mon Mar 4 15:02:32 2024 +0530
Improved Connection Count server select strategy (#15975)
Updated the Direct Druid Client so as to make Connection Count Server
Selector Strategy work more efficiently.
If creating connection to a node is slow, then that slowness wouldn't be
accounted for if we count the open connections after sending the request. So we
increment the counter and then send the request.
---
.../org/apache/druid/client/DirectDruidClient.java | 35 +++++++++-------
.../apache/druid/client/DirectDruidClientTest.java | 48 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 14 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 9a93c8bacd0..6a502110058 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -455,22 +455,29 @@ public class DirectDruidClient<T> implements
QueryRunner<T>
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s]
url[%s] timed out.", query.getId(), url));
}
- future = httpClient.go(
- new Request(
- HttpMethod.POST,
- new URL(url)
-
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query,
timeLeft)))
- .setHeader(
- HttpHeaders.Names.CONTENT_TYPE,
- isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE :
MediaType.APPLICATION_JSON
- ),
- responseHandler,
- Duration.millis(timeLeft)
- );
+ // increment is moved up so that if future initialization is queued by
some other process,
+ // we can increment the count earlier so that we can route the request
to a different server
+ openConnections.getAndIncrement();
+ try {
+ future = httpClient.go(
+ new Request(
+ HttpMethod.POST,
+ new URL(url)
+
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query,
timeLeft)))
+ .setHeader(
+ HttpHeaders.Names.CONTENT_TYPE,
+ isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE :
MediaType.APPLICATION_JSON
+ ),
+ responseHandler,
+ Duration.millis(timeLeft)
+ );
+ }
+ catch (Exception e) {
+ openConnections.getAndDecrement();
+ throw e;
+ }
queryWatcher.registerQueryFuture(query, future);
-
- openConnections.getAndIncrement();
Futures.addCallback(
future,
new FutureCallback<InputStream>()
diff --git
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index 4c99d2b5409..4fffcd6fd35 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.client;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -27,6 +29,7 @@ import
org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -38,6 +41,7 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
@@ -97,6 +101,7 @@ public class DirectDruidClientTest
@Before
public void setup()
{
+ NullHandling.initializeForTests();
httpClient = EasyMock.createMock(HttpClient.class);
serverSelector = new ServerSelector(
dataSegment,
@@ -427,4 +432,47 @@ public class DirectDruidClientTest
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
+
+ @Test
+ public void testConnectionCountAfterException() throws
JsonProcessingException
+ {
+ ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
+ EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
+ .andThrow(new JsonProcessingException("Error")
+ {
+ });
+
+ DirectDruidClient client2 = new DirectDruidClient(
+ new ReflectionQueryToolChestWarehouse(),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+ mockObjectMapper,
+ httpClient,
+ "http",
+ hostName,
+ new NoopServiceEmitter(),
+ queryCancellationExecutor
+ );
+
+ QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
+ new DruidServer(
+ "test1",
+ "localhost",
+ null,
+ 0,
+ ServerType.HISTORICAL,
+ DruidServer.DEFAULT_TIER,
+ 0
+ ),
+ client2
+ );
+
+ serverSelector.addServerAndUpdateSegment(queryableDruidServer2,
serverSelector.getSegment());
+
+ TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
+ query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
+
+ TimeBoundaryQuery finalQuery = query;
+ Assert.assertThrows(RuntimeException.class, () ->
client2.run(QueryPlus.wrap(finalQuery)));
+ Assert.assertEquals(0, client2.getNumOpenConnections());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]