This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch druid_client_test_coverage
in repository https://gitbox.apache.org/repos/asf/druid.git

commit bd4259c4417172f7847c54f5b7e438ade9da20d6
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Fri Dec 19 12:19:02 2025 -0500

    Better DirectDruidClientTest for code coverage
    
    Replaces mocks with more concrete test helpers that will invoke and 
exercise the internals
    of DirectDruidClient response handler, etc. Also, add a new unit test for
    ResourceExceededException that wasn't previously exercised.
---
 .../apache/druid/client/DirectDruidClientTest.java | 533 +++++++++------------
 .../org/apache/druid/client/TestHttpClient.java    |  15 +-
 2 files changed, 233 insertions(+), 315 deletions(-)

diff --git 
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java 
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index a86a07c8f86..71f13537d02 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -19,51 +19,49 @@
 
 package org.apache.druid.client;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
-import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
-import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.data.input.ResourceInputSource;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.CloneQueryMode;
 import org.apache.druid.query.Druids;
-import org.apache.druid.query.Query;
+import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.query.Result;
-import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordination.TestCoordinatorClient;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.apache.druid.timeline.SegmentId;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
 import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -71,9 +69,11 @@ import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.net.URL;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Deque;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -82,61 +82,20 @@ public class DirectDruidClientTest
   @ClassRule
   public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new 
QueryStackTests.Junit4ConglomerateRule();
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   private final String hostName = "localhost:8080";
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final ResponseContext responseContext = 
ResponseContext.createEmpty();
 
-  private final DataSegment dataSegment = new DataSegment(
-      "test",
-      Intervals.of("2013-01-01/2013-01-02"),
-      DateTimes.of("2013-01-01").toString(),
-      new HashMap<>(),
-      new ArrayList<>(),
-      new ArrayList<>(),
-      NoneShardSpec.instance(),
-      0,
-      0L
-  );
-  private ServerSelector serverSelector;
-
-  private HttpClient httpClient;
-  private DirectDruidClient client;
-  private QueryableDruidServer queryableDruidServer;
   private ScheduledExecutorService queryCancellationExecutor;
 
   @Before
   public void setup()
   {
-    final BrokerViewOfCoordinatorConfig filter = new 
BrokerViewOfCoordinatorConfig(new TestCoordinatorClient());
-    filter.start();
-    httpClient = EasyMock.createMock(HttpClient.class);
-    serverSelector = new ServerSelector(
-        dataSegment,
-        new HighestPriorityTierSelectorStrategy(new 
ConnectionCountServerSelectorStrategy()),
-        filter
-    );
+    responseContext.initialize();
     queryCancellationExecutor = 
Execs.scheduledSingleThreaded("query-cancellation-executor");
-    client = new DirectDruidClient(
-        conglomerateRule.getConglomerate(),
-        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        new DefaultObjectMapper(),
-        httpClient,
-        "http",
-        hostName,
-        new NoopServiceEmitter(),
-        queryCancellationExecutor
-    );
-    queryableDruidServer = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client
-    );
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer, 
serverSelector.getSegment());
   }
 
   @After
@@ -151,86 +110,77 @@ public class DirectDruidClientTest
   {
     final URL url = new URL(StringUtils.format("http://%s/druid/v2/";, 
hostName));
 
-    SettableFuture<InputStream> futureResult = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(futureResult)
-            .times(1);
-
-    SettableFuture futureException = SettableFuture.create();
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(futureException)
-            .times(1);
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(SettableFuture.create())
-            .atLeastOnce();
+    // A simple HttpClient that records requests and returns queued futures 
per call.
+    class SequencedHttpClient implements HttpClient
+    {
+      private final List<Request> captured = new ArrayList<>();
+      private final Deque<ListenableFuture<InputStream>> queue = new 
ArrayDeque<>();
+
+      void enqueue(ListenableFuture<InputStream> f)
+      {
+        queue.addLast(f);
+      }
+
+      List<Request> requests()
+      {
+        return captured;
+      }
+
+      @Override
+      public <Intermediate, Final> ListenableFuture<Final> go(Request request, 
HttpResponseHandler<Intermediate, Final> handler)
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
+      public <Intermediate, Final> ListenableFuture<Final> go(
+          Request request,
+          HttpResponseHandler<Intermediate, Final> handler,
+          Duration readTimeout
+      )
+      {
+        captured.add(request);
+        ListenableFuture<InputStream> f = queue.pollFirst();
+        if (f == null) {
+          f = SettableFuture.create(); // pending forever
+        }
+        return (ListenableFuture<Final>) f;
+      }
+    }
 
-    EasyMock.replay(httpClient);
+    SequencedHttpClient sequenced = new SequencedHttpClient();
+    DirectDruidClient client1 = makeDirectDruidClient(sequenced);
 
-    DirectDruidClient client2 = new DirectDruidClient(
-        conglomerateRule.getConglomerate(),
-        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        new DefaultObjectMapper(),
-        httpClient,
-        "http",
-        "foo2",
-        new NoopServiceEmitter(),
-        queryCancellationExecutor
-    );
+    DirectDruidClient client2 = makeDirectDruidClient(sequenced);
 
-    QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client2
-    );
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer2, 
serverSelector.getSegment());
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
-    Sequence s1 = client.run(QueryPlus.wrap(query));
-    Assert.assertTrue(capturedRequest.hasCaptured());
-    Assert.assertEquals(url, capturedRequest.getValue().getUrl());
-    Assert.assertEquals(HttpMethod.POST, 
capturedRequest.getValue().getMethod());
-    Assert.assertEquals(1, client.getNumOpenConnections());
-
-    // simulate read timeout
-    client.run(QueryPlus.wrap(query));
-    Assert.assertEquals(2, client.getNumOpenConnections());
+    // Queue first call: pending until we provide a result
+    SettableFuture<InputStream> futureResult = SettableFuture.create();
+    sequenced.enqueue(futureResult);
+    // Queue second call: will fail with ReadTimeoutException
+    SettableFuture<InputStream> futureException = SettableFuture.create();
+    sequenced.enqueue(futureException);
+    // Subsequent calls: no enqueue → default pending futures created in client
+
+    QueryPlus queryPlus = getQueryPlus();
+
+    Sequence s1 = client1.run(queryPlus, responseContext);
+    Assert.assertFalse(sequenced.requests().isEmpty());
+    Assert.assertEquals(url, sequenced.requests().get(0).getUrl());
+    Assert.assertEquals(HttpMethod.POST, 
sequenced.requests().get(0).getMethod());
+    Assert.assertEquals(1, client1.getNumOpenConnections());
+
+    // simulate read timeout on second request
+    client1.run(queryPlus, responseContext);
+    Assert.assertEquals(2, client1.getNumOpenConnections());
     futureException.setException(new ReadTimeoutException());
-    Assert.assertEquals(1, client.getNumOpenConnections());
-
-    // subsequent connections should work
-    client.run(QueryPlus.wrap(query));
-    client.run(QueryPlus.wrap(query));
-    client.run(QueryPlus.wrap(query));
+    Assert.assertEquals(1, client1.getNumOpenConnections());
 
-    Assert.assertTrue(client.getNumOpenConnections() == 4);
+    // subsequent connections should work (and remain open)
+    client1.run(queryPlus, responseContext);
+    client1.run(queryPlus, responseContext);
+    client1.run(queryPlus, responseContext);
+    Assert.assertEquals(4, client1.getNumOpenConnections());
 
     // produce result for first connection
     futureResult.set(
@@ -241,244 +191,199 @@ public class DirectDruidClientTest
     List<Result> results = s1.toList();
     Assert.assertEquals(1, results.size());
     Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), 
results.get(0).getTimestamp());
-    Assert.assertEquals(3, client.getNumOpenConnections());
-
-    client2.run(QueryPlus.wrap(query));
-    client2.run(QueryPlus.wrap(query));
+    Assert.assertEquals(3, client1.getNumOpenConnections());
 
+    client2.run(queryPlus, responseContext);
+    client2.run(queryPlus, responseContext);
     Assert.assertEquals(2, client2.getNumOpenConnections());
-
-    Assert.assertEquals(serverSelector.pick(null, 
CloneQueryMode.EXCLUDECLONES), queryableDruidServer2);
-
-    EasyMock.verify(httpClient);
   }
 
   @Test
   public void testCancel()
   {
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    ListenableFuture<Object> cancelledFuture = 
Futures.immediateCancelledFuture();
-    SettableFuture<Object> cancellationFuture = SettableFuture.create();
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(cancelledFuture)
-            .once();
-
-    EasyMock.expect(
-        httpClient.go(
-            EasyMock.capture(capturedRequest),
-            EasyMock.<HttpResponseHandler>anyObject(),
-            EasyMock.anyObject(Duration.class)
-        )
-    )
-            .andReturn(cancellationFuture)
-            .anyTimes();
+    DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithFuture(Futures.immediateCancelledFuture()));
+    Sequence results = client.run(getQueryPlus(), responseContext);
 
-    EasyMock.replay(httpClient);
-
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
-    cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new 
StringBuilder("cancelled")));
-    Sequence results = client.run(QueryPlus.wrap(query));
-    Assert.assertEquals(HttpMethod.POST, 
capturedRequest.getValue().getMethod());
     Assert.assertEquals(0, client.getNumOpenConnections());
-
-
-    QueryInterruptedException exception = null;
-    try {
-      results.toList();
-    }
-    catch (QueryInterruptedException e) {
-      exception = e;
-    }
-    Assert.assertNotNull(exception);
-
-    EasyMock.verify(httpClient);
+    QueryInterruptedException actualException =
+        Assert.assertThrows(QueryInterruptedException.class, () -> 
results.toList());
+    Assert.assertEquals(hostName, actualException.getHost());
+    Assert.assertEquals("Query cancelled", actualException.getErrorCode());
+    Assert.assertEquals("Task was cancelled.", 
actualException.getCause().getMessage());
   }
 
   @Test
   public void testQueryInterruptionExceptionLogMessage()
   {
     SettableFuture<Object> interruptionFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String hostName = "localhost:8080";
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(interruptionFuture)
-        .anyTimes();
-
-    EasyMock.replay(httpClient);
-
-    // test error
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
     interruptionFuture.set(
         new ByteArrayInputStream(
             
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")
         )
     );
-    Sequence results = client.run(QueryPlus.wrap(query));
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithFuture(interruptionFuture));
 
-    QueryInterruptedException actualException = null;
-    try {
-      results.toList();
-    }
-    catch (QueryInterruptedException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
+    interruptionFuture.set(
+        new 
ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}"))
+    );
+    Sequence results = client.run(getQueryPlus(), responseContext);
+
+    QueryInterruptedException actualException =
+        Assert.assertThrows(QueryInterruptedException.class, () -> 
results.toList());
     Assert.assertEquals("testing1", actualException.getErrorCode());
     Assert.assertEquals("testing2", actualException.getMessage());
     Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
   }
 
   @Test
   public void testQueryTimeoutBeforeFuture() throws IOException, 
InterruptedException
   {
     SettableFuture<Object> timeoutFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String queryId = "timeout-before-future";
-
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(timeoutFuture)
-        .anyTimes();
-
-    EasyMock.replay(httpClient);
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = query.withOverriddenContext(
-        ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 250, "queryId", queryId)
-    );
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture));
 
-    Sequence results = client.run(QueryPlus.wrap(query));
+    QueryPlus queryPlus = 
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 250));
+    Sequence results = client.run(queryPlus, responseContext);
 
-    // incomplete result set
+    // Incomplete result set delivered via a pipe to simulate slow stream
     PipedInputStream in = new PipedInputStream();
     final PipedOutputStream out = new PipedOutputStream(in);
-    timeoutFuture.set(
-        in
+    timeoutFuture.set(in);
+
+    QueryTimeoutException actualException = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> {
+          
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
+          Thread.sleep(250);
+          out.write(StringUtils.toUtf8("]"));
+          out.close();
+          results.toList();
+        }
     );
-
-    QueryTimeoutException actualException = null;
-    try {
-      
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
-      Thread.sleep(250);
-      out.write(StringUtils.toUtf8("]"));
-      out.close();
-      results.toList();
-    }
-    catch (QueryTimeoutException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
     Assert.assertEquals("Query timeout", actualException.getErrorCode());
-    Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", 
actualException.getMessage());
+    Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed 
out", hostName), actualException.getMessage());
     Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
   }
 
   @Test
   public void testQueryTimeoutFromFuture()
   {
-    SettableFuture<Object> noFuture = SettableFuture.create();
-    Capture<Request> capturedRequest = EasyMock.newCapture();
-    final String queryId = "never-ending-future";
-
-    EasyMock
-        .expect(
-            httpClient.go(
-                EasyMock.capture(capturedRequest),
-                EasyMock.<HttpResponseHandler>anyObject(),
-                EasyMock.anyObject(Duration.class)
-            )
-        )
-        .andReturn(noFuture)
-        .anyTimes();
-
-    EasyMock.replay(httpClient);
-
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = query.withOverriddenContext(
-        ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 500, "queryId", queryId)
-    );
-
-    Sequence results = client.run(QueryPlus.wrap(query));
+    final SettableFuture<Object> timeoutFuture = SettableFuture.create();
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture));
 
-    QueryTimeoutException actualException = null;
-    try {
-      results.toList();
-    }
-    catch (QueryTimeoutException e) {
-      actualException = e;
-    }
-    Assert.assertNotNull(actualException);
+    QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
System.currentTimeMillis() + 500));
+    Sequence results = client.run(query, responseContext);
+    QueryTimeoutException actualException = 
Assert.assertThrows(QueryTimeoutException.class, results::toList);
     Assert.assertEquals("Query timeout", actualException.getErrorCode());
-    Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), 
actualException.getMessage());
+    Assert.assertEquals(StringUtils.format("Query [%s] timed out!", 
query.getQuery().getId()), actualException.getMessage());
     Assert.assertEquals(hostName, actualException.getHost());
-    EasyMock.verify(httpClient);
   }
 
   @Test
-  public void testConnectionCountAfterException() throws 
JsonProcessingException
+  public void testConnectionCountAfterException()
   {
-    ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
-    EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
-            .andThrow(new JsonProcessingException("Error")
-            {
-            });
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithQueryError());
 
-    DirectDruidClient client2 = new DirectDruidClient(
+    Assert.assertThrows(RuntimeException.class, () -> 
client.run(getQueryPlus(), responseContext));
+    Assert.assertEquals(0, client.getNumOpenConnections());
+  }
+
+  @Test
+  public void testResourceLimitExceededException()
+  {
+    final DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithSuccessfulQuery());
+
+    final QueryPlus queryPlus = getQueryPlus(Map.of(
+        QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+        DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE
+    ));
+
+    ResourceLimitExceededException actualException = Assert.assertThrows(
+        ResourceLimitExceededException.class,
+        () -> client.run(queryPlus, responseContext)
+    );
+
+    Assert.assertEquals(
+        StringUtils.format(
+            "Query[%s] url[http://localhost:8080/druid/v2/] total bytes 
gathered[127] exceeds maxScatterGatherBytes[100]",
+            queryPlus.getQuery().getId()
+        ),
+        actualException.getMessage());
+  }
+
+  private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+  {
+    return new DirectDruidClient(
         conglomerateRule.getConglomerate(),
         QueryRunnerTestHelper.NOOP_QUERYWATCHER,
-        mockObjectMapper,
+        objectMapper,
         httpClient,
         "http",
         hostName,
         new NoopServiceEmitter(),
         queryCancellationExecutor
     );
+  }
 
-    QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
-        new DruidServer(
-            "test1",
-            "localhost",
-            null,
-            0,
-            ServerType.HISTORICAL,
-            DruidServer.DEFAULT_TIER,
-            0
-        ),
-        client2
+  private HttpClient initHttpClientWithQueryError()
+  {
+    return initHttpClientWithFuture(new TestHttpClient(objectMapper), true);
+  }
+
+  private HttpClient initHttpClientWithSuccessfulQuery()
+  {
+    return initHttpClientWithFuture(new TestHttpClient(objectMapper), false);
+  }
+
+  private HttpClient initHttpClientWithFuture(ListenableFuture future)
+  {
+    return initHttpClientWithFuture(new TestHttpClient(objectMapper, future), 
false);
+  }
+
+  private HttpClient initHttpClientWithFuture(TestHttpClient httpClient, 
boolean throwQueryError)
+  {
+    final QueryableIndex index = makeQueryableIndex();
+    httpClient.addServerAndRunner(
+        new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, 
DruidServer.DEFAULT_TIER, 0),
+        new TestHttpClient.SimpleServerManager(
+            conglomerateRule.getConglomerate(), 
DataSegment.builder(SegmentId.dummy("test")).build(), index, throwQueryError
+        )
     );
+    return httpClient;
+  }
 
-    serverSelector.addServerAndUpdateSegment(queryableDruidServer2, 
serverSelector.getSegment());
+  private QueryableIndex makeQueryableIndex()
+  {
+    try {
+      return IndexBuilder.create()
+                         .tmpDir(temporaryFolder.newFolder())
+                         
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                         .schema(
+                             new IncrementalIndexSchema.Builder()
+                                 
.withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec())
+                                 .build()
+                         )
+                         .inputSource(
+                             ResourceInputSource.of(
+                                 NestedDataTestUtils.class.getClassLoader(),
+                                 NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE
+                             )
+                         )
+                         .inputFormat(TestIndex.DEFAULT_JSON_INPUT_FORMAT)
+                         .inputTmpDir(temporaryFolder.newFolder())
+                         .buildMMappedIndex();
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-    TimeBoundaryQuery query = 
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
-    query = 
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
+  private static QueryPlus getQueryPlus()
+  {
+    return getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, 
Long.MAX_VALUE));
+  }
 
-    TimeBoundaryQuery finalQuery = query;
-    Assert.assertThrows(RuntimeException.class, () -> 
client2.run(QueryPlus.wrap(finalQuery)));
-    Assert.assertEquals(0, client2.getNumOpenConnections());
+  private static QueryPlus getQueryPlus(Map<String, Object> context)
+  {
+    return 
QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(context).randomQueryId().build());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java 
b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
index b979f39799a..1d2567b50e0 100644
--- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
+++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
@@ -67,10 +67,19 @@ public class TestHttpClient implements HttpClient
 
   private final Map<URL, SimpleServerManager> servers = new HashMap<>();
   private final ObjectMapper objectMapper;
+  @Nullable
+  private final ListenableFuture future;
 
   public TestHttpClient(ObjectMapper objectMapper)
   {
     this.objectMapper = objectMapper;
+    this.future = null;
+  }
+
+  public TestHttpClient(ObjectMapper objectMapper, ListenableFuture future)
+  {
+    this.objectMapper = objectMapper;
+    this.future = future;
   }
 
   public void addServerAndRunner(DruidServer server, SimpleServerManager 
serverManager)
@@ -139,7 +148,11 @@ public class TestHttpClient implements HttpClient
       );
       final ClientResponse<Intermediate> intermClientResponse = 
handler.handleResponse(response, NOOP_TRAFFIC_COP);
       final ClientResponse<Final> finalClientResponse = 
handler.done(intermClientResponse);
-      return Futures.immediateFuture(finalClientResponse.getObj());
+      if (future != null) {
+        return future;
+      } else {
+        return Futures.immediateFuture(finalClientResponse.getObj());
+      }
     }
     catch (IOException e) {
       throw new RuntimeException(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to