jtuglu1 commented on code in PR #18861:
URL: https://github.com/apache/druid/pull/18861#discussion_r2655927332
##########
server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java:
##########
@@ -241,244 +191,199 @@ public void testRun() throws Exception
List<Result> results = s1.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"),
results.get(0).getTimestamp());
- Assert.assertEquals(3, client.getNumOpenConnections());
-
- client2.run(QueryPlus.wrap(query));
- client2.run(QueryPlus.wrap(query));
+ Assert.assertEquals(3, client1.getNumOpenConnections());
+ client2.run(queryPlus, responseContext);
+ client2.run(queryPlus, responseContext);
Assert.assertEquals(2, client2.getNumOpenConnections());
-
- Assert.assertEquals(serverSelector.pick(null,
CloneQueryMode.EXCLUDECLONES), queryableDruidServer2);
-
- EasyMock.verify(httpClient);
}
@Test
public void testCancel()
{
- Capture<Request> capturedRequest = EasyMock.newCapture();
- ListenableFuture<Object> cancelledFuture =
Futures.immediateCancelledFuture();
- SettableFuture<Object> cancellationFuture = SettableFuture.create();
-
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(cancelledFuture)
- .once();
-
- EasyMock.expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(cancellationFuture)
- .anyTimes();
+ DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithFuture(Futures.immediateCancelledFuture()));
+ Sequence results = client.run(getQueryPlus(), responseContext);
- EasyMock.replay(httpClient);
-
-
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
- cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new
StringBuilder("cancelled")));
- Sequence results = client.run(QueryPlus.wrap(query));
- Assert.assertEquals(HttpMethod.POST,
capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client.getNumOpenConnections());
-
-
- QueryInterruptedException exception = null;
- try {
- results.toList();
- }
- catch (QueryInterruptedException e) {
- exception = e;
- }
- Assert.assertNotNull(exception);
-
- EasyMock.verify(httpClient);
+ QueryInterruptedException actualException =
+ Assert.assertThrows(QueryInterruptedException.class, () ->
results.toList());
+ Assert.assertEquals(hostName, actualException.getHost());
+ Assert.assertEquals("Query cancelled", actualException.getErrorCode());
+ Assert.assertEquals("Task was cancelled.",
actualException.getCause().getMessage());
}
@Test
public void testQueryInterruptionExceptionLogMessage()
{
SettableFuture<Object> interruptionFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String hostName = "localhost:8080";
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(interruptionFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
-
- // test error
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query =
query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
Long.MAX_VALUE));
interruptionFuture.set(
new ByteArrayInputStream(
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")
)
);
- Sequence results = client.run(QueryPlus.wrap(query));
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithFuture(interruptionFuture));
- QueryInterruptedException actualException = null;
- try {
- results.toList();
- }
- catch (QueryInterruptedException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
+ interruptionFuture.set(
+ new
ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}"))
+ );
+ Sequence results = client.run(getQueryPlus(), responseContext);
+
+ QueryInterruptedException actualException =
+ Assert.assertThrows(QueryInterruptedException.class, () ->
results.toList());
Assert.assertEquals("testing1", actualException.getErrorCode());
Assert.assertEquals("testing2", actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
}
@Test
public void testQueryTimeoutBeforeFuture() throws IOException,
InterruptedException
{
SettableFuture<Object> timeoutFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String queryId = "timeout-before-future";
-
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(timeoutFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
-
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query = query.withOverriddenContext(
- ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 250, "queryId", queryId)
- );
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture));
- Sequence results = client.run(QueryPlus.wrap(query));
+ QueryPlus queryPlus =
getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 250));
+ Sequence results = client.run(queryPlus, responseContext);
- // incomplete result set
+ // Incomplete result set delivered via a pipe to simulate slow stream
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
- timeoutFuture.set(
- in
+ timeoutFuture.set(in);
+
+ QueryTimeoutException actualException = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> {
+
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
+ Thread.sleep(250);
+ out.write(StringUtils.toUtf8("]"));
+ out.close();
+ results.toList();
+ }
);
-
- QueryTimeoutException actualException = null;
- try {
-
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
- Thread.sleep(250);
- out.write(StringUtils.toUtf8("]"));
- out.close();
- results.toList();
- }
- catch (QueryTimeoutException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
- Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out",
actualException.getMessage());
+ Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed
out", hostName), actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
}
@Test
public void testQueryTimeoutFromFuture()
{
- SettableFuture<Object> noFuture = SettableFuture.create();
- Capture<Request> capturedRequest = EasyMock.newCapture();
- final String queryId = "never-ending-future";
-
- EasyMock
- .expect(
- httpClient.go(
- EasyMock.capture(capturedRequest),
- EasyMock.<HttpResponseHandler>anyObject(),
- EasyMock.anyObject(Duration.class)
- )
- )
- .andReturn(noFuture)
- .anyTimes();
-
- EasyMock.replay(httpClient);
-
- TimeBoundaryQuery query =
Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
- query = query.withOverriddenContext(
- ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 500, "queryId", queryId)
- );
-
- Sequence results = client.run(QueryPlus.wrap(query));
+ final SettableFuture<Object> timeoutFuture = SettableFuture.create();
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithFuture(timeoutFuture));
- QueryTimeoutException actualException = null;
- try {
- results.toList();
- }
- catch (QueryTimeoutException e) {
- actualException = e;
- }
- Assert.assertNotNull(actualException);
+ QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 500));
+ Sequence results = client.run(query, responseContext);
+ QueryTimeoutException actualException =
Assert.assertThrows(QueryTimeoutException.class, results::toList);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
- Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId),
actualException.getMessage());
+ Assert.assertEquals(StringUtils.format("Query [%s] timed out!",
query.getQuery().getId()), actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
- EasyMock.verify(httpClient);
}
@Test
- public void testConnectionCountAfterException() throws
JsonProcessingException
+ public void testConnectionCountAfterException()
{
- ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class);
- EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class))
- .andThrow(new JsonProcessingException("Error")
- {
- });
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithQueryError());
- DirectDruidClient client2 = new DirectDruidClient(
+ Assert.assertThrows(RuntimeException.class, () ->
client.run(getQueryPlus(), responseContext));
+ Assert.assertEquals(0, client.getNumOpenConnections());
+ }
+
+ @Test
+ public void testResourceLimitExceededException()
+ {
+ final DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithSuccessfulQuery());
+
+ final QueryPlus queryPlus = getQueryPlus(Map.of(
+ QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100,
+ DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE
+ ));
+
+ ResourceLimitExceededException actualException = Assert.assertThrows(
+ ResourceLimitExceededException.class,
+ () -> client.run(queryPlus, responseContext)
+ );
+
+ Assert.assertEquals(
+ StringUtils.format(
+ "Query[%s] url[http://localhost:8080/druid/v2/] total bytes
gathered[127] exceeds maxScatterGatherBytes[100]",
+ queryPlus.getQuery().getId()
+ ),
+ actualException.getMessage());
+ }
+
+ private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+ {
+ return new DirectDruidClient(
conglomerateRule.getConglomerate(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- mockObjectMapper,
+ objectMapper,
httpClient,
"http",
hostName,
new NoopServiceEmitter(),
queryCancellationExecutor
);
+ }
- QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
- new DruidServer(
- "test1",
- "localhost",
- null,
- 0,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- 0
- ),
- client2
+ private HttpClient initHttpClientWithQueryError()
+ {
+ return initHttpClientWithFuture(new TestHttpClient(objectMapper), true);
+ }
+
+ private HttpClient initHttpClientWithSuccessfulQuery()
+ {
+ return initHttpClientWithFuture(new TestHttpClient(objectMapper), false);
+ }
+
+ private HttpClient initHttpClientWithFuture(ListenableFuture future)
+ {
+ return initHttpClientWithFuture(new TestHttpClient(objectMapper, future),
false);
+ }
+
+ private HttpClient initHttpClientWithFuture(TestHttpClient httpClient,
boolean throwQueryError)
Review Comment:
can we name this something different? It has the same name as the above
method but different signature. Maybe something like
`initHttpClientFromExistingClient`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]