This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb59263036 Increase code coverage for `DirectDruidClientTest` (#18861)
5fb59263036 is described below

commit 5fb59263036de2eed8f1b07858997c232029492f
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sat Jan 3 09:22:42 2026 -0800

    Increase code coverage for `DirectDruidClientTest` (#18861)
    
    Test-only change:
    
    - Increases test coverage for the DirectDruidClient class from 57%, 35%, 
16% to 76%, 73%, 52% for method / line / branch coverage respectively.
    - Replaces mocks with more concrete classes and test helpers, allowing the 
internals of the class, including HttpResponseHandler to be exercised.
    - Also added a test for feat: log total bytes gathered when max 
scatter-gather bytes limit is reached #18841 for which coverage had to be 
skipped
---
 .../apache/druid/client/DirectDruidClientTest.java | 574 +++++++++------------
 .../apache/druid/client/QueuedTestHttpClient.java  |  79 +++
 .../org/apache/druid/client/TestHttpClient.java    |  36 +-
 .../simulate/BlockingExecutorService.java          |   2 +-
 4 files changed, 372 insertions(+), 319 deletions(-)

diff --git 
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java 
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index a86a07c8f86..d1c6be7b7ba 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -19,62 +19,60 @@
 
 package org.apache.druid.client;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
-import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
-import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.data.input.ResourceInputSource;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.CloneQueryMode;
 import org.apache.druid.query.Druids;
-import org.apache.druid.query.Query;
+import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.query.Result;
-import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordination.TestCoordinatorClient;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.apache.druid.timeline.SegmentId;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeUnit;
 
 public class DirectDruidClientTest
@@ -82,66 +80,32 @@ public class DirectDruidClientTest
   @ClassRule
   public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new 
QueryStackTests.Junit4ConglomerateRule();
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   private final String hostName = "localhost:8080";
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final ResponseContext responseContext = 
ResponseContext.createEmpty();
 
-  private final DataSegment dataSegment = new DataSegment(
-      "test",
-      Intervals.of("2013-01-01/2013-01-02"),
-      DateTimes.of("2013-01-01").toString(),
-      new HashMap<>(),
-      new ArrayList<>(),
-      new ArrayList<>(),
-      NoneShardSpec.instance(),
-      0,
-      0L
-  );
-  private ServerSelector serverSelector;
-
-  private HttpClient httpClient;
-  private DirectDruidClient client;
-  private QueryableDruidServer queryableDruidServer;
-  private ScheduledExecutorService queryCancellationExecutor;
+  private WrappingScheduledExecutorService queryCancellationExecutor;
+  private BlockingExecutorService blockingExecutorService;
 
   @Before
   public void setup()
   {
-    final BrokerViewOfCoordinatorConfig filter = new 
BrokerViewOfCoordinatorConfig(new TestCoordinatorClient());
-    filter.start();
-    httpClient = EasyMock.createMock(HttpClient.class);
-    serverSelector = new ServerSelector(
-        dataSegment,
-        new HighestPriorityTierSelectorStrategy(new 
ConnectionCountServerSelectorStrategy()),
-        filter
+    responseContext.initialize();
+    blockingExecutorService = new 
BlockingExecutorService("test-druid-client-cancel-executor");
+    queryCancellationExecutor = new WrappingScheduledExecutorService(
+        "DirectDruidClientTest-%s",
+        blockingExecutorService,
+        false
     );
-    queryCancellationExecutor = 
Execs.scheduledSingleThreaded("query-cancellation-executor");
-    client = new DirectDruidClient(
-        conglomerateRule.getConglomerate(),
-        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        new DefaultObjectMapper(),
-        httpClient,
-        "http",
-        hostName,
-        new NoopServiceEmitter(),
-        queryCancellationExecutor
-    );
-    queryableDruidServer = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client
-    );
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer, 
serverSelector.getSegment());
   }
 
   @After
   public void teardown() throws InterruptedException
   {
+    blockingExecutorService.shutdownNow();
     queryCancellationExecutor.shutdown();
     queryCancellationExecutor.awaitTermination(1, TimeUnit.SECONDS);
   }
@@ -151,86 +115,39 @@ public class DirectDruidClientTest
   {
     final URL url = new URL(StringUtils.format("http://%s/druid/v2/";, 
hostName));
 
-    SettableFuture<InputStream> futureResult = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(futureResult)
-            .times(1);
-
-    SettableFuture futureException = SettableFuture.create();
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(futureException)
-            .times(1);
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(SettableFuture.create())
-            .atLeastOnce();
+    QueuedTestHttpClient queuedHttpClient = new QueuedTestHttpClient();
+    DirectDruidClient client1 = makeDirectDruidClient(queuedHttpClient);
 
-    EasyMock.replay(httpClient);
+    DirectDruidClient client2 = makeDirectDruidClient(queuedHttpClient);
 
-    DirectDruidClient client2 = new DirectDruidClient(
-        conglomerateRule.getConglomerate(),
-        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        new DefaultObjectMapper(),
-        httpClient,
-        "http",
-        "foo2",
-        new NoopServiceEmitter(),
-        queryCancellationExecutor
-    );
-
-    QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client2
-    );
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer2, 
serverSelector.getSegment());
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
-    Sequence s1 = client.run(QueryPlus.wrap(query));
-    Assert.assertTrue(capturedRequest.hasCaptured());
-    Assert.assertEquals(url, capturedRequest.getValue().getUrl());
-    Assert.assertEquals(HttpMethod.POST, 
capturedRequest.getValue().getMethod());
-    Assert.assertEquals(1, client.getNumOpenConnections());
-
-    // simulate read timeout
-    client.run(QueryPlus.wrap(query));
-    Assert.assertEquals(2, client.getNumOpenConnections());
+    // Queue first call: pending until we provide a result
+    SettableFuture<InputStream> futureResult = SettableFuture.create();
+    queuedHttpClient.enqueue(futureResult);
+    // Queue second call: will fail with ReadTimeoutException
+    SettableFuture<InputStream> futureException = SettableFuture.create();
+    queuedHttpClient.enqueue(futureException);
+    // Subsequent calls: no enqueue → default pending futures created in client
+
+    QueryPlus queryPlus = getQueryPlus();
+
+    Sequence s1 = client1.run(queryPlus, responseContext);
+    List<Request> requests = queuedHttpClient.getRequests();
+    Assert.assertFalse(requests.isEmpty());
+    Assert.assertEquals(url, requests.get(0).getUrl());
+    Assert.assertEquals(HttpMethod.POST, requests.get(0).getMethod());
+    Assert.assertEquals(1, client1.getNumOpenConnections());
+
+    // simulate read timeout on second request
+    client1.run(queryPlus, responseContext);
+    Assert.assertEquals(2, client1.getNumOpenConnections());
     futureException.setException(new ReadTimeoutException());
-    Assert.assertEquals(1, client.getNumOpenConnections());
+    Assert.assertEquals(1, client1.getNumOpenConnections());
 
-    // subsequent connections should work
-    client.run(QueryPlus.wrap(query));
-    client.run(QueryPlus.wrap(query));
-    client.run(QueryPlus.wrap(query));
-
-    Assert.assertTrue(client.getNumOpenConnections() == 4);
+    // subsequent connections should work (and remain open)
+    client1.run(queryPlus, responseContext);
+    client1.run(queryPlus, responseContext);
+    client1.run(queryPlus, responseContext);
+    Assert.assertEquals(4, client1.getNumOpenConnections());
 
     // produce result for first connection
     futureResult.set(
@@ -241,244 +158,267 @@ public class DirectDruidClientTest
     List<Result> results = s1.toList();
     Assert.assertEquals(1, results.size());
     Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), 
results.get(0).getTimestamp());
-    Assert.assertEquals(3, client.getNumOpenConnections());
-
-    client2.run(QueryPlus.wrap(query));
-    client2.run(QueryPlus.wrap(query));
+    Assert.assertEquals(3, client1.getNumOpenConnections());
 
+    client2.run(queryPlus, responseContext);
+    client2.run(queryPlus, responseContext);
     Assert.assertEquals(2, client2.getNumOpenConnections());
-
-    Assert.assertEquals(serverSelector.pick(null, 
CloneQueryMode.EXCLUDECLONES), queryableDruidServer2);
-
-    EasyMock.verify(httpClient);
   }
 
   @Test
-  public void testCancel()
+  public void testCancel() throws MalformedURLException
   {
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    ListenableFuture<Object> cancelledFuture = 
Futures.immediateCancelledFuture();
-    SettableFuture<Object> cancellationFuture = SettableFuture.create();
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(cancelledFuture)
-            .once();
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(cancellationFuture)
-            .anyTimes();
-
-    EasyMock.replay(httpClient);
+    QueryPlus queryPlus = getQueryPlus();
+    TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 
Futures.immediateCancelledFuture());
 
+    // add a generic server and a cancel query URL
+    QueryableIndex index = makeQueryableIndex();
+    TestHttpClient.SimpleServerManager simpleServerManager = new 
TestHttpClient.SimpleServerManager(
+        conglomerateRule.getConglomerate(), 
DataSegment.builder(SegmentId.dummy("test")).build(), index, false
+    );
+    testHttpClient.addServerAndRunner(
+        new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, 
DruidServer.DEFAULT_TIER, 0),
+        simpleServerManager
+    );
+    testHttpClient.addUrlAndRunner(
+        new URL(StringUtils.format("http://%s/druid/v2/%s";, hostName, 
queryPlus.getQuery().getId())),
+        simpleServerManager
+    );
+    DirectDruidClient client = makeDirectDruidClient(testHttpClient);
+    Sequence results = client.run(queryPlus, responseContext);
 
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
-    cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new 
StringBuilder("cancelled")));
-    Sequence results = client.run(QueryPlus.wrap(query));
-    Assert.assertEquals(HttpMethod.POST, 
capturedRequest.getValue().getMethod());
     Assert.assertEquals(0, client.getNumOpenConnections());
+    QueryInterruptedException actualException =
+        Assert.assertThrows(QueryInterruptedException.class, () -> 
results.toList());
+    Assert.assertEquals(hostName, actualException.getHost());
+    Assert.assertEquals("Query cancelled", actualException.getErrorCode());
+    Assert.assertEquals("Task was cancelled.", 
actualException.getCause().getMessage());
 
+    Assert.assertTrue(blockingExecutorService.hasPendingTasks());
+    blockingExecutorService.finishNextPendingTask();
+    Assert.assertTrue(blockingExecutorService.hasPendingTasks());
+    ISE observedException = Assert.assertThrows(ISE.class, () -> 
blockingExecutorService.finishNextPendingTask());
+    Assert.assertTrue(observedException.getCause() instanceof 
CancellationException);
 
-    QueryInterruptedException exception = null;
-    try {
-      results.toList();
-    }
-    catch (QueryInterruptedException e) {
-      exception = e;
-    }
-    Assert.assertNotNull(exception);
-
-    EasyMock.verify(httpClient);
   }
 
   @Test
   public void testQueryInterruptionExceptionLogMessage()
   {
     SettableFuture<Object> interruptionFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String hostName = "localhost:8080";
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(interruptionFuture)
-        .anyTimes();
-
-    EasyMock.replay(httpClient);
-
-    // test error
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
     interruptionFuture.set(
         new ByteArrayInputStream(
             
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")
         )
     );
-    Sequence results = client.run(QueryPlus.wrap(query));
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(interruptionFuture));
 
-    QueryInterruptedException actualException = null;
-    try {
-      results.toList();
-    }
-    catch (QueryInterruptedException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
+    interruptionFuture.set(
+        new 
ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}"))
+    );
+    Sequence results = client.run(getQueryPlus(), responseContext);
+
+    QueryInterruptedException actualException =
+        Assert.assertThrows(QueryInterruptedException.class, () -> 
results.toList());
     Assert.assertEquals("testing1", actualException.getErrorCode());
     Assert.assertEquals("testing2", actualException.getMessage());
     Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
   }
 
   @Test
-  public void testQueryTimeoutBeforeFuture() throws IOException, 
InterruptedException
+  public void testQueryTimeoutBeforeFuture() throws IOException
   {
     SettableFuture<Object> timeoutFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String queryId = "timeout-before-future";
-
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(timeoutFuture)
-        .anyTimes();
-
-    EasyMock.replay(httpClient);
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = query.withOverriddenContext(
-        ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 250, "queryId", queryId)
-    );
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
 
-    Sequence results = client.run(QueryPlus.wrap(query));
+    QueryPlus queryPlus = 
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 250));
+    Sequence results = client.run(queryPlus, responseContext);
 
-    // incomplete result set
+    // Incomplete result set delivered via a pipe to simulate slow stream
     PipedInputStream in = new PipedInputStream();
     final PipedOutputStream out = new PipedOutputStream(in);
-    timeoutFuture.set(
-        in
+    timeoutFuture.set(in);
+
+    QueryTimeoutException actualException = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> {
+          
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
+          Thread.sleep(250);
+          out.write(StringUtils.toUtf8("]"));
+          out.close();
+          results.toList();
+        }
     );
-
-    QueryTimeoutException actualException = null;
-    try {
-      
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
-      Thread.sleep(250);
-      out.write(StringUtils.toUtf8("]"));
-      out.close();
-      results.toList();
-    }
-    catch (QueryTimeoutException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
     Assert.assertEquals("Query timeout", actualException.getErrorCode());
-    Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", 
actualException.getMessage());
+    Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed 
out", hostName), actualException.getMessage());
     Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
   }
 
   @Test
   public void testQueryTimeoutFromFuture()
   {
-    SettableFuture<Object> noFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String queryId = "never-ending-future";
-
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(noFuture)
-        .anyTimes();
+    final SettableFuture<Object> timeoutFuture = SettableFuture.create();
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
 
-    EasyMock.replay(httpClient);
+    QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 500));
+    Sequence results = client.run(query, responseContext);
+    QueryTimeoutException actualException = 
Assert.assertThrows(QueryTimeoutException.class, results::toList);
+    Assert.assertEquals("Query timeout", actualException.getErrorCode());
+    Assert.assertEquals(StringUtils.format("Query [%s] timed out!", 
query.getQuery().getId()), actualException.getMessage());
+    Assert.assertEquals(hostName, actualException.getHost());
+  }
+
+  @Test
+  public void testQueryTimeoutDuringRunThrowsExceptionImmediately()
+  {
+    SettableFuture<Object> timeoutFuture = SettableFuture.create();
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture));
 
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = query.withOverriddenContext(
-        ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 500, "queryId", queryId)
+    QueryPlus queryPlus = 
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis()));
+    QueryTimeoutException actualException = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> client.run(queryPlus, responseContext)
+    );
+    Assert.assertEquals("Query timeout", actualException.getErrorCode());
+    Assert.assertEquals(
+        StringUtils.format(
+            "Query[%s] url[http://%s/druid/v2/] timed out.",
+            queryPlus.getQuery().getId(),
+            hostName
+        ), actualException.getMessage()
     );
+  }
+
+  @Test
+  public void testQueryTimeoutDuringResponseHandling()
+  {
+    final TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 
110);
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false));
 
-    Sequence results = client.run(QueryPlus.wrap(query));
+    final QueryPlus queryPlus = getQueryPlus(Map.of(
+        QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+        DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 100
+    ));
 
-    QueryTimeoutException actualException = null;
-    try {
-      results.toList();
-    }
-    catch (QueryTimeoutException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
+    QueryTimeoutException actualException = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> client.run(queryPlus, responseContext)
+    );
     Assert.assertEquals("Query timeout", actualException.getErrorCode());
-    Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), 
actualException.getMessage());
-    Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
+    Assert.assertEquals(
+        StringUtils.format("Query[%s] url[http://%s/druid/v2/] timed out.",
+                           queryPlus.getQuery().getId(),
+                           hostName
+        ), actualException.getMessage()
+    );
   }
 
   @Test
-  public void testConnectionCountAfterException() throws 
JsonProcessingException
+  public void testConnectionCountAfterException()
   {
-    ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
-    EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
-            .andThrow(new JsonProcessingException("Error")
-            {
-            });
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient());
+
+    Assert.assertThrows(RuntimeException.class, () -> 
client.run(getQueryPlus(), responseContext));
+    Assert.assertEquals(0, client.getNumOpenConnections());
+  }
+
+  @Test
+  public void testResourceLimitExceededException()
+  {
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithSuccessfulQuery());
+
+    final QueryPlus queryPlus = getQueryPlus(Map.of(
+        QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+        DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE
+    ));
 
-    DirectDruidClient client2 = new DirectDruidClient(
+    ResourceLimitExceededException actualException = Assert.assertThrows(
+        ResourceLimitExceededException.class,
+        () -> client.run(queryPlus, responseContext)
+    );
+
+    Assert.assertEquals(
+        StringUtils.format(
+            "Query[%s] url[http://localhost:8080/druid/v2/] total bytes 
gathered[127] exceeds maxScatterGatherBytes[100]",
+            queryPlus.getQuery().getId()
+        ),
+        actualException.getMessage());
+  }
+
+  private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+  {
+    return new DirectDruidClient(
         conglomerateRule.getConglomerate(),
         QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        mockObjectMapper,
+        objectMapper,
         httpClient,
         "http",
         hostName,
         new NoopServiceEmitter(),
         queryCancellationExecutor
     );
+  }
 
-    QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client2
+  private HttpClient initHttpClientFromExistingClient()
+  {
+    return initHttpClientFromExistingClient(new TestHttpClient(objectMapper), 
true);
+  }
+
+  private HttpClient initHttpClientWithSuccessfulQuery()
+  {
+    return initHttpClientFromExistingClient(new TestHttpClient(objectMapper), 
false);
+  }
+
+  private HttpClient initHttpClientFromExistingClient(ListenableFuture future)
+  {
+    return initHttpClientFromExistingClient(new TestHttpClient(objectMapper, 
future), false);
+  }
+
+  private HttpClient initHttpClientFromExistingClient(TestHttpClient 
httpClient, boolean throwQueryError)
+  {
+    final QueryableIndex index = makeQueryableIndex();
+    httpClient.addServerAndRunner(
+        new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, 
DruidServer.DEFAULT_TIER, 0),
+        new TestHttpClient.SimpleServerManager(
+            conglomerateRule.getConglomerate(), 
DataSegment.builder(SegmentId.dummy("test")).build(), index, throwQueryError
+        )
     );
+    return httpClient;
+  }
 
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer2, 
serverSelector.getSegment());
+  private QueryableIndex makeQueryableIndex()
+  {
+    try {
+      return IndexBuilder.create()
+                         .tmpDir(temporaryFolder.newFolder())
+                         
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                         .schema(
+                             new IncrementalIndexSchema.Builder()
+                                 
.withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec())
+                                 .build()
+                         )
+                         .inputSource(
+                             ResourceInputSource.of(
+                                 NestedDataTestUtils.class.getClassLoader(),
+                                 NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE
+                             )
+                         )
+                         .inputFormat(TestIndex.DEFAULT_JSON_INPUT_FORMAT)
+                         .inputTmpDir(temporaryFolder.newFolder())
+                         .buildMMappedIndex();
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
+  private static QueryPlus getQueryPlus()
+  {
+    return getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
+  }
 
-    TimeBoundaryQuery finalQuery = query;
-    Assert.assertThrows(RuntimeException.class, () -> 
client2.run(QueryPlus.wrap(finalQuery)));
-    Assert.assertEquals(0, client2.getNumOpenConnections());
+  private static QueryPlus getQueryPlus(Map<String, Object> context)
+  {
+    return 
QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(context).randomQueryId().build());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java 
b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java
new file mode 100644
index 00000000000..67f392bd4d9
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.joda.time.Duration;
+
+import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+/**
+ * A test {@link HttpClient} that captures {@link Request}s and serves 
preloaded responses sequentially. If no future
+ * has been {@link #enqueue(ListenableFuture) enqueued} when {@link 
#go(Request, HttpResponseHandler, Duration)} is
+ * invoked, the client returns a pending future that never completes.
+ *
+ * <p>This is useful for tests that need deterministic control over request 
ordering and response timing.</p>
+ */
+public class QueuedTestHttpClient implements HttpClient
+{
+  private final List<Request> captured = new ArrayList<>();
+  private final Deque<ListenableFuture<InputStream>> queue = new 
ArrayDeque<>();
+
+  public void enqueue(ListenableFuture<InputStream> f)
+  {
+    queue.addLast(f);
+  }
+
+  public List<Request> getRequests()
+  {
+    return captured;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <Intermediate, Final> ListenableFuture<Final> go(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler,
+      Duration readTimeout
+  )
+  {
+    captured.add(request);
+    ListenableFuture<InputStream> f = queue.pollFirst();
+    if (f == null) {
+      f = SettableFuture.create(); // pending forever
+    }
+    return (ListenableFuture<Final>) f;
+  }
+
+  @Override
+  public <Intermediate, Final> ListenableFuture<Final> go(Request request, 
HttpResponseHandler<Intermediate, Final> handler)
+  {
+    throw new UnsupportedOperationException("Use go(Request, 
HttpResponseHandler<Intermediate, Final>, Duration) instead)");
+  }
+}
+
diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java 
b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
index b979f39799a..ef4da264aad 100644
--- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
+++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
@@ -67,10 +67,29 @@ public class TestHttpClient implements HttpClient
 
   private final Map<URL, SimpleServerManager> servers = new HashMap<>();
   private final ObjectMapper objectMapper;
+  @Nullable
+  private final ListenableFuture future;
+  private final long responseDelayMillis;
 
   public TestHttpClient(ObjectMapper objectMapper)
   {
     this.objectMapper = objectMapper;
+    this.future = null;
+    this.responseDelayMillis = -1;
+  }
+
+  public TestHttpClient(ObjectMapper objectMapper, ListenableFuture future)
+  {
+    this.objectMapper = objectMapper;
+    this.future = future;
+    this.responseDelayMillis = -1;
+  }
+
+  public TestHttpClient(ObjectMapper objectMapper, long responseDelayMillis)
+  {
+    this.objectMapper = objectMapper;
+    this.future = null;
+    this.responseDelayMillis = responseDelayMillis;
   }
 
   public void addServerAndRunner(DruidServer server, SimpleServerManager 
serverManager)
@@ -78,6 +97,11 @@ public class TestHttpClient implements HttpClient
     servers.put(computeUrl(server), serverManager);
   }
 
+  public void addUrlAndRunner(URL queryId, SimpleServerManager serverManager)
+  {
+    servers.put(queryId, serverManager);
+  }
+
   @Nullable
   public SimpleServerManager getServerManager(DruidServer server)
   {
@@ -137,13 +161,23 @@ public class TestHttpClient implements HttpClient
       response.setContent(
           HeapChannelBufferFactory.getInstance().getBuffer(serializedContent, 
0, serializedContent.length)
       );
+      if (responseDelayMillis > 0) {
+        Thread.sleep(responseDelayMillis);
+      }
       final ClientResponse<Intermediate> intermClientResponse = 
handler.handleResponse(response, NOOP_TRAFFIC_COP);
       final ClientResponse<Final> finalClientResponse = 
handler.done(intermClientResponse);
-      return Futures.immediateFuture(finalClientResponse.getObj());
+      if (future != null) {
+        return future;
+      } else {
+        return Futures.immediateFuture(finalClientResponse.getObj());
+      }
     }
     catch (IOException e) {
       throw new RuntimeException(e);
     }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
index 89fa07c4155..65fd37be2cc 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -248,7 +248,7 @@ public class BlockingExecutorService implements 
ExecutorService
         future.complete(result);
       }
       catch (Exception e) {
-        throw new ISE("Error while executing task", e);
+        throw new ISE(e, "Error[%s] while executing task", e.getMessage());
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to