This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch druid_client_test_coverage in repository https://gitbox.apache.org/repos/asf/druid.git
commit bd4259c4417172f7847c54f5b7e438ade9da20d6 Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Fri Dec 19 12:19:02 2025 -0500 Better DirectDruidClientTest for code coverage Replaces mocks with more concrete test helpers that will invoke and exercise the internals of DirectDruidClient response handler, etc. Also, add a new unit test for ResourceExceededException that wasn't previously exercised. --- .../apache/druid/client/DirectDruidClientTest.java | 533 +++++++++------------ .../org/apache/druid/client/TestHttpClient.java | 15 +- 2 files changed, 233 insertions(+), 315 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index a86a07c8f86..71f13537d02 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -19,51 +19,49 @@ package org.apache.druid.client; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy; -import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; -import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Druids; -import org.apache.druid.query.Query; +import org.apache.druid.query.NestedDataTestUtils; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.Result; -import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.apache.druid.timeline.SegmentId; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -71,9 +69,11 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.net.URL; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Deque; import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -82,61 +82,20 @@ public class DirectDruidClientTest @ClassRule public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new QueryStackTests.Junit4ConglomerateRule(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final String hostName = "localhost:8080"; + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + private final ResponseContext responseContext = ResponseContext.createEmpty(); - private final DataSegment dataSegment = new DataSegment( - "test", - Intervals.of("2013-01-01/2013-01-02"), - DateTimes.of("2013-01-01").toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0L - ); - private ServerSelector serverSelector; - - private HttpClient httpClient; - private DirectDruidClient client; - private QueryableDruidServer queryableDruidServer; private ScheduledExecutorService queryCancellationExecutor; @Before public void setup() { - final BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); - filter.start(); - httpClient = EasyMock.createMock(HttpClient.class); - serverSelector = new ServerSelector( - dataSegment, - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), - filter - ); + responseContext.initialize(); queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); - client = new DirectDruidClient( - conglomerateRule.getConglomerate(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - hostName, - new NoopServiceEmitter(), - queryCancellationExecutor - ); - queryableDruidServer = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); } @After @@ -151,86 +110,77 @@ public class DirectDruidClientTest { final URL url = new URL(StringUtils.format("http://%s/druid/v2/", hostName)); - SettableFuture<InputStream> futureResult = SettableFuture.create(); - Capture<Request> capturedRequest = EasyMock.newCapture(); - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(futureResult) - .times(1); - - SettableFuture futureException = SettableFuture.create(); - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(futureException) - .times(1); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(SettableFuture.create()) - .atLeastOnce(); + // A simple HttpClient that records requests and returns queued futures per call. + class SequencedHttpClient implements HttpClient + { + private final List<Request> captured = new ArrayList<>(); + private final Deque<ListenableFuture<InputStream>> queue = new ArrayDeque<>(); + + void enqueue(ListenableFuture<InputStream> f) + { + queue.addLast(f); + } + + List<Request> requests() + { + return captured; + } + + @Override + public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> handler) + { + throw new UnsupportedOperationException(); + } + + @Override + @SuppressWarnings("unchecked") + public <Intermediate, Final> ListenableFuture<Final> go( + Request request, + HttpResponseHandler<Intermediate, Final> handler, + Duration readTimeout + ) + { + captured.add(request); + ListenableFuture<InputStream> f = queue.pollFirst(); + if (f == null) { + f = SettableFuture.create(); // pending forever + } + return (ListenableFuture<Final>) f; + } + } - EasyMock.replay(httpClient); + SequencedHttpClient sequenced = new SequencedHttpClient(); + DirectDruidClient client1 = makeDirectDruidClient(sequenced); - DirectDruidClient client2 = new DirectDruidClient( - conglomerateRule.getConglomerate(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - "foo2", - new NoopServiceEmitter(), - queryCancellationExecutor - ); + DirectDruidClient client2 = makeDirectDruidClient(sequenced); - QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client2 - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - Sequence s1 = client.run(QueryPlus.wrap(query)); - Assert.assertTrue(capturedRequest.hasCaptured()); - Assert.assertEquals(url, capturedRequest.getValue().getUrl()); - Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); - Assert.assertEquals(1, client.getNumOpenConnections()); - - // simulate read timeout - client.run(QueryPlus.wrap(query)); - Assert.assertEquals(2, client.getNumOpenConnections()); + // Queue first call: pending until we provide a result + SettableFuture<InputStream> futureResult = SettableFuture.create(); + sequenced.enqueue(futureResult); + // Queue second call: will fail with ReadTimeoutException + SettableFuture<InputStream> futureException = SettableFuture.create(); + sequenced.enqueue(futureException); + // Subsequent calls: no enqueue → default pending futures created in client + + QueryPlus queryPlus = getQueryPlus(); + + Sequence s1 = client1.run(queryPlus, responseContext); + Assert.assertFalse(sequenced.requests().isEmpty()); + Assert.assertEquals(url, sequenced.requests().get(0).getUrl()); + Assert.assertEquals(HttpMethod.POST, sequenced.requests().get(0).getMethod()); + Assert.assertEquals(1, client1.getNumOpenConnections()); + + // simulate read timeout on second request + client1.run(queryPlus, responseContext); + Assert.assertEquals(2, client1.getNumOpenConnections()); futureException.setException(new ReadTimeoutException()); - Assert.assertEquals(1, client.getNumOpenConnections()); - - // subsequent connections should work - client.run(QueryPlus.wrap(query)); - client.run(QueryPlus.wrap(query)); - client.run(QueryPlus.wrap(query)); + Assert.assertEquals(1, client1.getNumOpenConnections()); - Assert.assertTrue(client.getNumOpenConnections() == 4); + // subsequent connections should work (and remain open) + client1.run(queryPlus, responseContext); + client1.run(queryPlus, responseContext); + client1.run(queryPlus, responseContext); + Assert.assertEquals(4, client1.getNumOpenConnections()); // produce result for first connection futureResult.set( @@ -241,244 +191,199 @@ public class DirectDruidClientTest List<Result> results = s1.toList(); Assert.assertEquals(1, results.size()); Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); - Assert.assertEquals(3, client.getNumOpenConnections()); - - client2.run(QueryPlus.wrap(query)); - client2.run(QueryPlus.wrap(query)); + Assert.assertEquals(3, client1.getNumOpenConnections()); + client2.run(queryPlus, responseContext); + client2.run(queryPlus, responseContext); Assert.assertEquals(2, client2.getNumOpenConnections()); - - Assert.assertEquals(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES), queryableDruidServer2); - - EasyMock.verify(httpClient); } @Test public void testCancel() { - Capture<Request> capturedRequest = EasyMock.newCapture(); - ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture(); - SettableFuture<Object> cancellationFuture = SettableFuture.create(); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(cancelledFuture) - .once(); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(cancellationFuture) - .anyTimes(); + DirectDruidClient client = makeDirectDruidClient(initHttpClientWithFuture(Futures.immediateCancelledFuture())); + Sequence results = client.run(getQueryPlus(), responseContext); - EasyMock.replay(httpClient); - - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); - Sequence results = client.run(QueryPlus.wrap(query)); - Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client.getNumOpenConnections()); - - - QueryInterruptedException exception = null; - try { - results.toList(); - } - catch (QueryInterruptedException e) { - exception = e; - } - Assert.assertNotNull(exception); - - EasyMock.verify(httpClient); + QueryInterruptedException actualException = + Assert.assertThrows(QueryInterruptedException.class, () -> results.toList()); + Assert.assertEquals(hostName, actualException.getHost()); + Assert.assertEquals("Query cancelled", actualException.getErrorCode()); + Assert.assertEquals("Task was cancelled.", actualException.getCause().getMessage()); } @Test public void testQueryInterruptionExceptionLogMessage() { SettableFuture<Object> interruptionFuture = SettableFuture.create(); - Capture<Request> capturedRequest = EasyMock.newCapture(); - final String hostName = "localhost:8080"; - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(interruptionFuture) - .anyTimes(); - - EasyMock.replay(httpClient); - - // test error - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); interruptionFuture.set( new ByteArrayInputStream( StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}") ) ); - Sequence results = client.run(QueryPlus.wrap(query)); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithFuture(interruptionFuture)); - QueryInterruptedException actualException = null; - try { - results.toList(); - } - catch (QueryInterruptedException e) { - actualException = e; - } - Assert.assertNotNull(actualException); + interruptionFuture.set( + new ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")) + ); + Sequence results = client.run(getQueryPlus(), responseContext); + + QueryInterruptedException actualException = + Assert.assertThrows(QueryInterruptedException.class, () -> results.toList()); Assert.assertEquals("testing1", actualException.getErrorCode()); Assert.assertEquals("testing2", actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); } @Test public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException { SettableFuture<Object> timeoutFuture = SettableFuture.create(); - Capture<Request> capturedRequest = EasyMock.newCapture(); - final String queryId = "timeout-before-future"; - - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(timeoutFuture) - .anyTimes(); - - EasyMock.replay(httpClient); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext( - ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250, "queryId", queryId) - ); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture)); - Sequence results = client.run(QueryPlus.wrap(query)); + QueryPlus queryPlus = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250)); + Sequence results = client.run(queryPlus, responseContext); - // incomplete result set + // Incomplete result set delivered via a pipe to simulate slow stream PipedInputStream in = new PipedInputStream(); final PipedOutputStream out = new PipedOutputStream(in); - timeoutFuture.set( - in + timeoutFuture.set(in); + + QueryTimeoutException actualException = Assert.assertThrows( + QueryTimeoutException.class, + () -> { + out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); + Thread.sleep(250); + out.write(StringUtils.toUtf8("]")); + out.close(); + results.toList(); + } ); - - QueryTimeoutException actualException = null; - try { - out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); - Thread.sleep(250); - out.write(StringUtils.toUtf8("]")); - out.close(); - results.toList(); - } - catch (QueryTimeoutException e) { - actualException = e; - } - Assert.assertNotNull(actualException); Assert.assertEquals("Query timeout", actualException.getErrorCode()); - Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", actualException.getMessage()); + Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed out", hostName), actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); } @Test public void testQueryTimeoutFromFuture() { - SettableFuture<Object> noFuture = SettableFuture.create(); - Capture<Request> capturedRequest = EasyMock.newCapture(); - final String queryId = "never-ending-future"; - - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.<HttpResponseHandler>anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(noFuture) - .anyTimes(); - - EasyMock.replay(httpClient); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext( - ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500, "queryId", queryId) - ); - - Sequence results = client.run(QueryPlus.wrap(query)); + final SettableFuture<Object> timeoutFuture = SettableFuture.create(); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture)); - QueryTimeoutException actualException = null; - try { - results.toList(); - } - catch (QueryTimeoutException e) { - actualException = e; - } - Assert.assertNotNull(actualException); + QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500)); + Sequence results = client.run(query, responseContext); + QueryTimeoutException actualException = Assert.assertThrows(QueryTimeoutException.class, results::toList); Assert.assertEquals("Query timeout", actualException.getErrorCode()); - Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), actualException.getMessage()); + Assert.assertEquals(StringUtils.format("Query [%s] timed out!", query.getQuery().getId()), actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); } @Test - public void testConnectionCountAfterException() throws JsonProcessingException + public void testConnectionCountAfterException() { - ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); - EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class)) - .andThrow(new JsonProcessingException("Error") - { - }); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithQueryError()); - DirectDruidClient client2 = new DirectDruidClient( + Assert.assertThrows(RuntimeException.class, () -> client.run(getQueryPlus(), responseContext)); + Assert.assertEquals(0, client.getNumOpenConnections()); + } + + @Test + public void testResourceLimitExceededException() + { + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery()); + + final QueryPlus queryPlus = getQueryPlus(Map.of( + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100, + DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE + )); + + ResourceLimitExceededException actualException = Assert.assertThrows( + ResourceLimitExceededException.class, + () -> client.run(queryPlus, responseContext) + ); + + Assert.assertEquals( + StringUtils.format( + "Query[%s] url[http://localhost:8080/druid/v2/] total bytes gathered[127] exceeds maxScatterGatherBytes[100]", + queryPlus.getQuery().getId() + ), + actualException.getMessage()); + } + + private DirectDruidClient makeDirectDruidClient(HttpClient httpClient) + { + return new DirectDruidClient( conglomerateRule.getConglomerate(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, - mockObjectMapper, + objectMapper, httpClient, "http", hostName, new NoopServiceEmitter(), queryCancellationExecutor ); + } - QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client2 + private HttpClient initHttpClientWithQueryError() + { + return initHttpClientWithFuture(new TestHttpClient(objectMapper), true); + } + + private HttpClient initHttpClientWithSuccessfulQuery() + { + return initHttpClientWithFuture(new TestHttpClient(objectMapper), false); + } + + private HttpClient initHttpClientWithFuture(ListenableFuture future) + { + return initHttpClientWithFuture(new TestHttpClient(objectMapper, future), false); + } + + private HttpClient initHttpClientWithFuture(TestHttpClient httpClient, boolean throwQueryError) + { + final QueryableIndex index = makeQueryableIndex(); + httpClient.addServerAndRunner( + new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), + new TestHttpClient.SimpleServerManager( + conglomerateRule.getConglomerate(), DataSegment.builder(SegmentId.dummy("test")).build(), index, throwQueryError + ) ); + return httpClient; + } - serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); + private QueryableIndex makeQueryableIndex() + { + try { + return IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE + ) + ) + .inputFormat(TestIndex.DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(temporaryFolder.newFolder()) + .buildMMappedIndex(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + private static QueryPlus getQueryPlus() + { + return getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + } - TimeBoundaryQuery finalQuery = query; - Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap(finalQuery))); - Assert.assertEquals(0, client2.getNumOpenConnections()); + private static QueryPlus getQueryPlus(Map<String, Object> context) + { + return QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(context).randomQueryId().build()); } } diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java b/server/src/test/java/org/apache/druid/client/TestHttpClient.java index b979f39799a..1d2567b50e0 100644 --- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java +++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java @@ -67,10 +67,19 @@ public class TestHttpClient implements HttpClient private final Map<URL, SimpleServerManager> servers = new HashMap<>(); private final ObjectMapper objectMapper; + @Nullable + private final ListenableFuture future; public TestHttpClient(ObjectMapper objectMapper) { this.objectMapper = objectMapper; + this.future = null; + } + + public TestHttpClient(ObjectMapper objectMapper, ListenableFuture future) + { + this.objectMapper = objectMapper; + this.future = future; } public void addServerAndRunner(DruidServer server, SimpleServerManager serverManager) @@ -139,7 +148,11 @@ public class TestHttpClient implements HttpClient ); final ClientResponse<Intermediate> intermClientResponse = handler.handleResponse(response, NOOP_TRAFFIC_COP); final ClientResponse<Final> finalClientResponse = handler.done(intermClientResponse); - return Futures.immediateFuture(finalClientResponse.getObj()); + if (future != null) { + return future; + } else { + return Futures.immediateFuture(finalClientResponse.getObj()); + } } catch (IOException e) { throw new RuntimeException(e); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
