This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb59263036 Increase code coverage for `DirectDruidClientTest` (#18861)
5fb59263036 is described below
commit 5fb59263036de2eed8f1b07858997c232029492f
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sat Jan 3 09:22:42 2026 -0800
Increase code coverage for `DirectDruidClientTest` (#18861)
Test-only change:
- Increases test coverage for the DirectDruidClient class from 57%, 35%,
16% to 76%, 73%, 52% for method / line / branch coverage respectively.
- Replaces mocks with more concrete classes and test helpers, allowing the
internals of the class, including HttpResponseHandler to be exercised.
- Also added a test for feat: log total bytes gathered when max
scatter-gather bytes limit is reached #18841 for which coverage had to be
skipped
---
.../apache/druid/client/DirectDruidClientTest.java | 574 +++++++++------------
.../apache/druid/client/QueuedTestHttpClient.java | 79 +++
.../org/apache/druid/client/TestHttpClient.java | 36 +-
.../simulate/BlockingExecutorService.java | 2 +-
4 files changed, 372 insertions(+), 319 deletions(-)
diff --git
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index a86a07c8f86..d1c6be7b7ba 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -19,62 +19,60 @@
package org.apache.druid.client;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
-import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
-import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.data.input.ResourceInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.Druids;
-import org.apache.druid.query.Query;
+import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.Result;
-import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordination.TestCoordinatorClient;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.apache.druid.timeline.SegmentId;
import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.net.MalformedURLException;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
public class DirectDruidClientTest
@@ -82,66 +80,32 @@ public class DirectDruidClientTest
@ClassRule
public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new
QueryStackTests.Junit4ConglomerateRule();
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
private final String hostName = "localhost:8080";
+ private final ObjectMapper objectMapper = new DefaultObjectMapper();
+ private final ResponseContext responseContext =
ResponseContext.createEmpty();
- private final DataSegment dataSegment = new DataSegment(
- "test",
- Intervals.of("2013-01-01/2013-01-02"),
- DateTimes.of("2013-01-01").toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 0L
- );
- private ServerSelector serverSelector;
-
- private HttpClient httpClient;
- private DirectDruidClient client;
- private QueryableDruidServer queryableDruidServer;
- private ScheduledExecutorService queryCancellationExecutor;
+ private WrappingScheduledExecutorService queryCancellationExecutor;
+ private BlockingExecutorService blockingExecutorService;
@Before
public void setup()
{
- final BrokerViewOfCoordinatorConfig filter = new
BrokerViewOfCoordinatorConfig(new TestCoordinatorClient());
- filter.start();
- httpClient = EasyMock.createMock(HttpClient.class);
- serverSelector = new ServerSelector(
- dataSegment,
- new HighestPriorityTierSelectorStrategy(new
ConnectionCountServerSelectorStrategy()),
- filter
+ responseContext.initialize();
+ blockingExecutorService = new
BlockingExecutorService("test-druid-client-cancel-executor");
+ queryCancellationExecutor = new WrappingScheduledExecutorService(
+ "DirectDruidClientTest-%s",
+ blockingExecutorService,
+ false
);
- queryCancellationExecutor =
Execs.scheduledSingleThreaded("query-cancellation-executor");
- client = new DirectDruidClient(
- conglomerateRule.getConglomerate(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- new DefaultObjectMapper(),
- httpClient,
- "http",
- hostName,
- new NoopServiceEmitter(),
- queryCancellationExecutor
- );
- queryableDruidServer = new QueryableDruidServer(
- new DruidServer(
- "test1",
- "localhost",
- null,
- 0,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- 0
- ),
- client
- );
- serverSelector.addServerAndUpdateSegment(queryableDruidServer,
serverSelector.getSegment());
}
@After
public void teardown() throws InterruptedException
{
+ blockingExecutorService.shutdownNow();
queryCancellationExecutor.shutdown();
queryCancellationExecutor.awaitTermination(1, TimeUnit.SECONDS);
}
@@ -151,86 +115,39 @@ public class DirectDruidClientTest
{
final URL url = new URL(StringUtils.format("http://%s/druid/v2/",
hostName));
- SettableFuture<InputStream> futureResult = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(futureResult)
- .times(1);
-
- SettableFuture futureException = SettableFuture.create();
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(futureException)
- .times(1);
-
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(SettableFuture.create())
- .atLeastOnce();
+ QueuedTestHttpClient queuedHttpClient = new QueuedTestHttpClient();
+ DirectDruidClient client1 = makeDirectDruidClient(queuedHttpClient);
- EasyMock.replay(httpClient);
+ DirectDruidClient client2 = makeDirectDruidClient(queuedHttpClient);
- DirectDruidClient client2 = new DirectDruidClient(
- conglomerateRule.getConglomerate(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- new DefaultObjectMapper(),
- httpClient,
- "http",
- "foo2",
- new NoopServiceEmitter(),
- queryCancellationExecutor
- );
-
- QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
- new DruidServer(
- "test1",
- "localhost",
- null,
- 0,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- 0
- ),
- client2
- );
- serverSelector.addServerAndUpdateSegment(queryableDruidServer2,
serverSelector.getSegment());
-
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
- Sequence s1 = client.run(QueryPlus.wrap(query));
- Assert.assertTrue(capturedRequest.hasCaptured());
- Assert.assertEquals(url, capturedRequest.getValue().getUrl());
- Assert.assertEquals(HttpMethod.POST,
capturedRequest.getValue().getMethod());
- Assert.assertEquals(1, client.getNumOpenConnections());
-
- // simulate read timeout
- client.run(QueryPlus.wrap(query));
- Assert.assertEquals(2, client.getNumOpenConnections());
+ // Queue first call: pending until we provide a result
+ SettableFuture<InputStream> futureResult = SettableFuture.create();
+ queuedHttpClient.enqueue(futureResult);
+ // Queue second call: will fail with ReadTimeoutException
+ SettableFuture<InputStream> futureException = SettableFuture.create();
+ queuedHttpClient.enqueue(futureException);
+ // Subsequent calls: no enqueue → default pending futures created in client
+
+ QueryPlus queryPlus = getQueryPlus();
+
+ Sequence s1 = client1.run(queryPlus, responseContext);
+ List<Request> requests = queuedHttpClient.getRequests();
+ Assert.assertFalse(requests.isEmpty());
+ Assert.assertEquals(url, requests.get(0).getUrl());
+ Assert.assertEquals(HttpMethod.POST, requests.get(0).getMethod());
+ Assert.assertEquals(1, client1.getNumOpenConnections());
+
+ // simulate read timeout on second request
+ client1.run(queryPlus, responseContext);
+ Assert.assertEquals(2, client1.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
- Assert.assertEquals(1, client.getNumOpenConnections());
+ Assert.assertEquals(1, client1.getNumOpenConnections());
- // subsequent connections should work
- client.run(QueryPlus.wrap(query));
- client.run(QueryPlus.wrap(query));
- client.run(QueryPlus.wrap(query));
-
- Assert.assertTrue(client.getNumOpenConnections() == 4);
+ // subsequent connections should work (and remain open)
+ client1.run(queryPlus, responseContext);
+ client1.run(queryPlus, responseContext);
+ client1.run(queryPlus, responseContext);
+ Assert.assertEquals(4, client1.getNumOpenConnections());
// produce result for first connection
futureResult.set(
@@ -241,244 +158,267 @@ public class DirectDruidClientTest
List<Result> results = s1.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"),
results.get(0).getTimestamp());
- Assert.assertEquals(3, client.getNumOpenConnections());
-
- client2.run(QueryPlus.wrap(query));
- client2.run(QueryPlus.wrap(query));
+ Assert.assertEquals(3, client1.getNumOpenConnections());
+ client2.run(queryPlus, responseContext);
+ client2.run(queryPlus, responseContext);
Assert.assertEquals(2, client2.getNumOpenConnections());
-
- Assert.assertEquals(serverSelector.pick(null,
CloneQueryMode.EXCLUDECLONES), queryableDruidServer2);
-
- EasyMock.verify(httpClient);
}
@Test
- public void testCancel()
+ public void testCancel() throws MalformedURLException
{
- Capture<Request> capturedRequest = EasyMock.newCapture();
- ListenableFuture<Object> cancelledFuture =
Futures.immediateCancelledFuture();
- SettableFuture<Object> cancellationFuture = SettableFuture.create();
-
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(cancelledFuture)
- .once();
-
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(cancellationFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
+ QueryPlus queryPlus = getQueryPlus();
+ TestHttpClient testHttpClient = new TestHttpClient(objectMapper,
Futures.immediateCancelledFuture());
+ // add a generic server and a cancel query URL
+ QueryableIndex index = makeQueryableIndex();
+ TestHttpClient.SimpleServerManager simpleServerManager = new
TestHttpClient.SimpleServerManager(
+ conglomerateRule.getConglomerate(),
DataSegment.builder(SegmentId.dummy("test")).build(), index, false
+ );
+ testHttpClient.addServerAndRunner(
+ new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER, 0),
+ simpleServerManager
+ );
+ testHttpClient.addUrlAndRunner(
+ new URL(StringUtils.format("http://%s/druid/v2/%s", hostName,
queryPlus.getQuery().getId())),
+ simpleServerManager
+ );
+ DirectDruidClient client = makeDirectDruidClient(testHttpClient);
+ Sequence results = client.run(queryPlus, responseContext);
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
- cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new
StringBuilder("cancelled")));
- Sequence results = client.run(QueryPlus.wrap(query));
- Assert.assertEquals(HttpMethod.POST,
capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client.getNumOpenConnections());
+ QueryInterruptedException actualException =
+ Assert.assertThrows(QueryInterruptedException.class, () ->
results.toList());
+ Assert.assertEquals(hostName, actualException.getHost());
+ Assert.assertEquals("Query cancelled", actualException.getErrorCode());
+ Assert.assertEquals("Task was cancelled.",
actualException.getCause().getMessage());
+ Assert.assertTrue(blockingExecutorService.hasPendingTasks());
+ blockingExecutorService.finishNextPendingTask();
+ Assert.assertTrue(blockingExecutorService.hasPendingTasks());
+ ISE observedException = Assert.assertThrows(ISE.class, () ->
blockingExecutorService.finishNextPendingTask());
+ Assert.assertTrue(observedException.getCause() instanceof
CancellationException);
- QueryInterruptedException exception = null;
- try {
- results.toList();
- }
- catch (QueryInterruptedException e) {
- exception = e;
- }
- Assert.assertNotNull(exception);
-
- EasyMock.verify(httpClient);
}
@Test
public void testQueryInterruptionExceptionLogMessage()
{
SettableFuture<Object> interruptionFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String hostName = "localhost:8080";
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(interruptionFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
-
- // test error
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
interruptionFuture.set(
new ByteArrayInputStream(
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")
)
);
- Sequence results = client.run(QueryPlus.wrap(query));
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(interruptionFuture));
- QueryInterruptedException actualException = null;
- try {
- results.toList();
- }
- catch (QueryInterruptedException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
+ interruptionFuture.set(
+ new
ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}"))
+ );
+ Sequence results = client.run(getQueryPlus(), responseContext);
+
+ QueryInterruptedException actualException =
+ Assert.assertThrows(QueryInterruptedException.class, () ->
results.toList());
Assert.assertEquals("testing1", actualException.getErrorCode());
Assert.assertEquals("testing2", actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
}
@Test
- public void testQueryTimeoutBeforeFuture() throws IOException,
InterruptedException
+ public void testQueryTimeoutBeforeFuture() throws IOException
{
SettableFuture<Object> timeoutFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String queryId = "timeout-before-future";
-
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(timeoutFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
-
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query = query.withOverriddenContext(
- ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 250, "queryId", queryId)
- );
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
- Sequence results = client.run(QueryPlus.wrap(query));
+ QueryPlus queryPlus =
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 250));
+ Sequence results = client.run(queryPlus, responseContext);
- // incomplete result set
+ // Incomplete result set delivered via a pipe to simulate slow stream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
- timeoutFuture.set(
- in
+ timeoutFuture.set(in);
+
+ QueryTimeoutException actualException = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> {
+
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
+ Thread.sleep(250);
+ out.write(StringUtils.toUtf8("]"));
+ out.close();
+ results.toList();
+ }
);
-
- QueryTimeoutException actualException = null;
- try {
-
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
- Thread.sleep(250);
- out.write(StringUtils.toUtf8("]"));
- out.close();
- results.toList();
- }
- catch (QueryTimeoutException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
- Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out",
actualException.getMessage());
+ Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed
out", hostName), actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
}
@Test
public void testQueryTimeoutFromFuture()
{
- SettableFuture<Object> noFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String queryId = "never-ending-future";
-
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(noFuture)
- .anyTimes();
+ final SettableFuture<Object> timeoutFuture = SettableFuture.create();
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
- EasyMock.replay(httpClient);
+ QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 500));
+ Sequence results = client.run(query, responseContext);
+ QueryTimeoutException actualException =
Assert.assertThrows(QueryTimeoutException.class, results::toList);
+ Assert.assertEquals("Query timeout", actualException.getErrorCode());
+ Assert.assertEquals(StringUtils.format("Query [%s] timed out!",
query.getQuery().getId()), actualException.getMessage());
+ Assert.assertEquals(hostName, actualException.getHost());
+ }
+
+ @Test
+ public void testQueryTimeoutDuringRunThrowsExceptionImmediately()
+ {
+ SettableFuture<Object> timeoutFuture = SettableFuture.create();
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query = query.withOverriddenContext(
- ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 500, "queryId", queryId)
+ QueryPlus queryPlus =
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis()));
+ QueryTimeoutException actualException = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> client.run(queryPlus, responseContext)
+ );
+ Assert.assertEquals("Query timeout", actualException.getErrorCode());
+ Assert.assertEquals(
+ StringUtils.format(
+ "Query[%s] url[http://%s/druid/v2/] timed out.",
+ queryPlus.getQuery().getId(),
+ hostName
+ ), actualException.getMessage()
);
+ }
+
+ @Test
+ public void testQueryTimeoutDuringResponseHandling()
+ {
+ final TestHttpClient testHttpClient = new TestHttpClient(objectMapper,
110);
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false));
- Sequence results = client.run(QueryPlus.wrap(query));
+ final QueryPlus queryPlus = getQueryPlus(Map.of(
+ QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+ DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 100
+ ));
- QueryTimeoutException actualException = null;
- try {
- results.toList();
- }
- catch (QueryTimeoutException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
+ QueryTimeoutException actualException = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> client.run(queryPlus, responseContext)
+ );
Assert.assertEquals("Query timeout", actualException.getErrorCode());
- Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId),
actualException.getMessage());
- Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
+ Assert.assertEquals(
+ StringUtils.format("Query[%s] url[http://%s/druid/v2/] timed out.",
+ queryPlus.getQuery().getId(),
+ hostName
+ ), actualException.getMessage()
+ );
}
@Test
- public void testConnectionCountAfterException() throws
JsonProcessingException
+ public void testConnectionCountAfterException()
{
- ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
- EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
- .andThrow(new JsonProcessingException("Error")
- {
- });
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient());
+
+ Assert.assertThrows(RuntimeException.class, () ->
client.run(getQueryPlus(), responseContext));
+ Assert.assertEquals(0, client.getNumOpenConnections());
+ }
+
+ @Test
+ public void testResourceLimitExceededException()
+ {
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithSuccessfulQuery());
+
+ final QueryPlus queryPlus = getQueryPlus(Map.of(
+ QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+ DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE
+ ));
- DirectDruidClient client2 = new DirectDruidClient(
+ ResourceLimitExceededException actualException = Assert.assertThrows(
+ ResourceLimitExceededException.class,
+ () -> client.run(queryPlus, responseContext)
+ );
+
+ Assert.assertEquals(
+ StringUtils.format(
+ "Query[%s] url[http://localhost:8080/druid/v2/] total bytes
gathered[127] exceeds maxScatterGatherBytes[100]",
+ queryPlus.getQuery().getId()
+ ),
+ actualException.getMessage());
+ }
+
+ private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+ {
+ return new DirectDruidClient(
conglomerateRule.getConglomerate(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- mockObjectMapper,
+ objectMapper,
httpClient,
"http",
hostName,
new NoopServiceEmitter(),
queryCancellationExecutor
);
+ }
- QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
- new DruidServer(
- "test1",
- "localhost",
- null,
- 0,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- 0
- ),
- client2
+ private HttpClient initHttpClientFromExistingClient()
+ {
+ return initHttpClientFromExistingClient(new TestHttpClient(objectMapper),
true);
+ }
+
+ private HttpClient initHttpClientWithSuccessfulQuery()
+ {
+ return initHttpClientFromExistingClient(new TestHttpClient(objectMapper),
false);
+ }
+
+ private HttpClient initHttpClientFromExistingClient(ListenableFuture future)
+ {
+ return initHttpClientFromExistingClient(new TestHttpClient(objectMapper,
future), false);
+ }
+
+ private HttpClient initHttpClientFromExistingClient(TestHttpClient
httpClient, boolean throwQueryError)
+ {
+ final QueryableIndex index = makeQueryableIndex();
+ httpClient.addServerAndRunner(
+ new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER, 0),
+ new TestHttpClient.SimpleServerManager(
+ conglomerateRule.getConglomerate(),
DataSegment.builder(SegmentId.dummy("test")).build(), index, throwQueryError
+ )
);
+ return httpClient;
+ }
- serverSelector.addServerAndUpdateSegment(queryableDruidServer2,
serverSelector.getSegment());
+ private QueryableIndex makeQueryableIndex()
+ {
+ try {
+ return IndexBuilder.create()
+ .tmpDir(temporaryFolder.newFolder())
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+
.withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec())
+ .build()
+ )
+ .inputSource(
+ ResourceInputSource.of(
+ NestedDataTestUtils.class.getClassLoader(),
+ NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE
+ )
+ )
+ .inputFormat(TestIndex.DEFAULT_JSON_INPUT_FORMAT)
+ .inputTmpDir(temporaryFolder.newFolder())
+ .buildMMappedIndex();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
+ private static QueryPlus getQueryPlus()
+ {
+ return getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
+ }
- TimeBoundaryQuery finalQuery = query;
- Assert.assertThrows(RuntimeException.class, () ->
client2.run(QueryPlus.wrap(finalQuery)));
- Assert.assertEquals(0, client2.getNumOpenConnections());
+ private static QueryPlus getQueryPlus(Map<String, Object> context)
+ {
+ return
QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(context).randomQueryId().build());
}
}
diff --git
a/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java
b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java
new file mode 100644
index 00000000000..67f392bd4d9
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.joda.time.Duration;
+
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+/**
+ * A test {@link HttpClient} that captures {@link Request}s and serves
preloaded responses sequentially. If no future
+ * has been {@link #enqueue(ListenableFuture) enqueued} when {@link
#go(Request, HttpResponseHandler, Duration)} is
+ * invoked, the client returns a pending future that never completes.
+ *
+ * <p>This is useful for tests that need deterministic control over request
ordering and response timing.</p>
+ */
+public class QueuedTestHttpClient implements HttpClient
+{
+ private final List<Request> captured = new ArrayList<>();
+ private final Deque<ListenableFuture<InputStream>> queue = new
ArrayDeque<>();
+
+ public void enqueue(ListenableFuture<InputStream> f)
+ {
+ queue.addLast(f);
+ }
+
+ public List<Request> getRequests()
+ {
+ return captured;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <Intermediate, Final> ListenableFuture<Final> go(
+ Request request,
+ HttpResponseHandler<Intermediate, Final> handler,
+ Duration readTimeout
+ )
+ {
+ captured.add(request);
+ ListenableFuture<InputStream> f = queue.pollFirst();
+ if (f == null) {
+ f = SettableFuture.create(); // pending forever
+ }
+ return (ListenableFuture<Final>) f;
+ }
+
+ @Override
+ public <Intermediate, Final> ListenableFuture<Final> go(Request request,
HttpResponseHandler<Intermediate, Final> handler)
+ {
+ throw new UnsupportedOperationException("Use go(Request,
HttpResponseHandler<Intermediate, Final>, Duration) instead)");
+ }
+}
+
diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
index b979f39799a..ef4da264aad 100644
--- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
+++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
@@ -67,10 +67,29 @@ public class TestHttpClient implements HttpClient
private final Map<URL, SimpleServerManager> servers = new HashMap<>();
private final ObjectMapper objectMapper;
+ @Nullable
+ private final ListenableFuture future;
+ private final long responseDelayMillis;
public TestHttpClient(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
+ this.future = null;
+ this.responseDelayMillis = -1;
+ }
+
+ public TestHttpClient(ObjectMapper objectMapper, ListenableFuture future)
+ {
+ this.objectMapper = objectMapper;
+ this.future = future;
+ this.responseDelayMillis = -1;
+ }
+
+ public TestHttpClient(ObjectMapper objectMapper, long responseDelayMillis)
+ {
+ this.objectMapper = objectMapper;
+ this.future = null;
+ this.responseDelayMillis = responseDelayMillis;
}
public void addServerAndRunner(DruidServer server, SimpleServerManager
serverManager)
@@ -78,6 +97,11 @@ public class TestHttpClient implements HttpClient
servers.put(computeUrl(server), serverManager);
}
+ public void addUrlAndRunner(URL queryId, SimpleServerManager serverManager)
+ {
+ servers.put(queryId, serverManager);
+ }
+
@Nullable
public SimpleServerManager getServerManager(DruidServer server)
{
@@ -137,13 +161,23 @@ public class TestHttpClient implements HttpClient
response.setContent(
HeapChannelBufferFactory.getInstance().getBuffer(serializedContent,
0, serializedContent.length)
);
+ if (responseDelayMillis > 0) {
+ Thread.sleep(responseDelayMillis);
+ }
final ClientResponse<Intermediate> intermClientResponse =
handler.handleResponse(response, NOOP_TRAFFIC_COP);
final ClientResponse<Final> finalClientResponse =
handler.done(intermClientResponse);
- return Futures.immediateFuture(finalClientResponse.getObj());
+ if (future != null) {
+ return future;
+ } else {
+ return Futures.immediateFuture(finalClientResponse.getObj());
+ }
}
catch (IOException e) {
throw new RuntimeException(e);
}
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
index 89fa07c4155..65fd37be2cc 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -248,7 +248,7 @@ public class BlockingExecutorService implements
ExecutorService
future.complete(result);
}
catch (Exception e) {
- throw new ISE("Error while executing task", e);
+ throw new ISE(e, "Error[%s] while executing task", e.getMessage());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]