This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e5e31b7d962 fix: chat handler response parsing (#19495)
e5e31b7d962 is described below
commit e5e31b7d96282f7f448497fd9b164aa5ac644934
Author: jtuglu1 <[email protected]>
AuthorDate: Thu May 21 21:52:11 2026 -0700
fix: chat handler response parsing (#19495)
Overlord task report requests can sometimes be too eager during
ingestion and ping a task before the http server servicing the chat
requests has spun up. This causes 4xx/5xx to be returned, which
are not correctly parsed by the chat client. While this doesn't
explicitly fail the ingestion, it spams the logs and causes confusion.
---
.../druid/k8s/overlord/KubernetesTaskRunner.java | 19 +--
.../k8s/overlord/KubernetesTaskRunnerTest.java | 76 ++++++++---
.../druid/indexing/overlord/RemoteTaskRunner.java | 15 +--
.../druid/indexing/overlord/TaskRunnerUtils.java | 43 +++++++
.../overlord/hrtr/HttpRemoteTaskRunner.java | 15 +--
.../indexing/overlord/RemoteTaskRunnerTest.java | 57 ++++++++-
.../indexing/overlord/TaskRunnerUtilsTest.java | 100 +++++++++++++++
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 142 ++++++++++++++++++++-
8 files changed, 401 insertions(+), 66 deletions(-)
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 947815a6782..fa7148a6151 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -21,7 +21,6 @@ package org.apache.druid.k8s.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -53,15 +52,12 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -389,20 +385,7 @@ public class KubernetesTaskRunner implements
TaskLogStreamer, TaskRunner
taskid
);
- try {
- return Optional.of(httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
- throw new RuntimeException(e);
- }
+ return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, url);
}
@Override
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 426feb33aed..edf77961b8c 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -38,7 +38,8 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
-import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
@@ -50,6 +51,9 @@ import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -586,7 +590,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void test_streamTaskReports_withExistingTask() throws Exception
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
+ final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -598,12 +602,12 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
EasyMock.expect(httpClient.go(
EasyMock.anyObject(Request.class),
- EasyMock.anyObject(InputStreamResponseHandler.class))
- ).andReturn(Futures.immediateFuture(IOUtils.toInputStream("{}",
StandardCharsets.UTF_8)));
+ EasyMock.anyObject(InputStreamFullResponseHandler.class))
+
).andReturn(Futures.immediateFuture(taskReportResponse(HttpResponseStatus.OK,
"{}")));
replayAll();
- Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
+ final Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
verifyAll();
@@ -611,17 +615,44 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assertions.assertEquals("{}", IOUtils.toString(maybeInputStream.get(),
StandardCharsets.UTF_8));
}
+ @Test
+ public void
test_streamTaskReports_whenTaskReportsUnavailable_returnsEmptyOptional() throws
Exception
+ {
+ final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.create("host", 0, 1, false);
+ }
+ };
+
+ runner.tasks.put(task.getId(), workItem);
+
+ EasyMock.expect(httpClient.go(
+ EasyMock.anyObject(Request.class),
+ EasyMock.anyObject(InputStreamFullResponseHandler.class))
+
).andReturn(Futures.immediateFuture(taskReportResponse(HttpResponseStatus.SERVICE_UNAVAILABLE,
"error")));
+
+ replayAll();
+
+ final Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
+
+ verifyAll();
+
+ Assertions.assertFalse(maybeInputStream.isPresent());
+ }
+
@Test
public void
test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() throws
Exception
{
- Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
+ final Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
Assertions.assertFalse(maybeInputStream.isPresent());
}
@Test
public void
test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws
Exception
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
+ final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -631,14 +662,14 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
runner.tasks.put(task.getId(), workItem);
- Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
+ final Optional<InputStream> maybeInputStream =
runner.streamTaskReports(task.getId());
Assertions.assertFalse(maybeInputStream.isPresent());
}
@Test
public void
test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
+ final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -648,7 +679,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
runner.tasks.put(task.getId(), workItem);
- ListenableFuture<InputStream> future = new ListenableFuture<>()
+ final ListenableFuture<InputStreamFullResponseHolder> future = new
ListenableFuture<>()
{
@Override
public void addListener(Runnable runnable, Executor executor)
@@ -674,13 +705,13 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
}
@Override
- public InputStream get() throws InterruptedException
+ public InputStreamFullResponseHolder get() throws InterruptedException
{
throw new InterruptedException();
}
@Override
- public InputStream get(long timeout, TimeUnit unit) throws
InterruptedException
+ public InputStreamFullResponseHolder get(long timeout, TimeUnit unit)
throws InterruptedException
{
throw new InterruptedException();
}
@@ -688,12 +719,12 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
EasyMock.expect(httpClient.go(
EasyMock.anyObject(Request.class),
- EasyMock.anyObject(InputStreamResponseHandler.class))
+ EasyMock.anyObject(InputStreamFullResponseHandler.class))
).andReturn(future);
replayAll();
- Exception e = Assertions.assertThrows(RuntimeException.class, () ->
runner.streamTaskReports(task.getId()));
+ final Exception e = Assertions.assertThrows(RuntimeException.class, () ->
runner.streamTaskReports(task.getId()));
Assertions.assertTrue(e.getCause() instanceof InterruptedException);
verifyAll();
@@ -702,7 +733,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
@Test
public void
test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
+ final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null,
kubernetesPeonLifecycle) {
@Override
public TaskLocation getLocation()
{
@@ -714,7 +745,7 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
EasyMock.expect(httpClient.go(
EasyMock.anyObject(Request.class),
- EasyMock.anyObject(InputStreamResponseHandler.class))
+ EasyMock.anyObject(InputStreamFullResponseHandler.class))
).andReturn(Futures.immediateFailedFuture(new Exception()));
replayAll();
@@ -886,4 +917,17 @@ public class KubernetesTaskRunnerTest extends
EasyMockSupport
Assertions.assertEquals(1, executor.getMaximumPoolSize());
Assertions.assertEquals(1, runner.getTotalCapacity());
}
+
+ private static InputStreamFullResponseHolder taskReportResponse(
+ final HttpResponseStatus status,
+ final String content
+ )
+ {
+ final InputStreamFullResponseHolder response = new
InputStreamFullResponseHolder(
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+ );
+ response.addChunk(content.getBytes(StandardCharsets.UTF_8));
+ response.done();
+ return response;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 4a4f0a08464..4018701d447 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -701,20 +701,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
taskId
);
- try {
- return Optional.of(httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
- throw new RuntimeException(e);
- }
+ return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, url);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
index e53ac175769..46d1b351217 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
@@ -19,19 +19,30 @@
package org.apache.druid.indexing.overlord;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
public class TaskRunnerUtils
@@ -111,4 +122,36 @@ public class TaskRunnerUtils
throw new RuntimeException(e);
}
}
+
+ public static Optional<InputStream> streamTaskReportsFromTaskLocation(
+ final HttpClient httpClient,
+ final URL url
+ ) throws IOException
+ {
+ try {
+ final InputStreamFullResponseHolder response = httpClient.go(
+ new Request(HttpMethod.GET, url),
+ new InputStreamFullResponseHandler()
+ ).get();
+ final HttpResponseStatus responseStatus =
response.getResponse().getStatus();
+
+ if (HttpResponseStatus.OK.equals(responseStatus)) {
+ return Optional.of(response.getContent());
+ } else if (HttpResponseStatus.NOT_FOUND.equals(responseStatus)
+ ||
HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus)) {
+ return Optional.absent();
+ } else {
+ throw new IOException(
+ StringUtils.format("Failed to stream task reports from [%s].
Response status [%s].", url, responseStatus)
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 62f6b499cb3..53bd0dbbf94 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1074,20 +1074,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer,
taskId
);
- try {
- return Optional.of(httpClient.go(
- new Request(HttpMethod.GET, url),
- new InputStreamResponseHandler()
- ).get());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
- // Unwrap if possible
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
- throw new RuntimeException(e);
- }
+ return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient,
url);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index d0c2b29f7e6..2a94d4f21ed 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -59,11 +59,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.apache.zookeeper.Watcher;
import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@@ -75,7 +79,6 @@ import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mockito;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
@@ -1167,7 +1170,7 @@ public class RemoteTaskRunnerTest
doSetup();
final Capture<Request> capturedRequest = Capture.newInstance();
final String reportString = "my report!";
- final ByteArrayInputStream reportResponse = new
ByteArrayInputStream(StringUtils.toUtf8(reportString));
+ final InputStreamFullResponseHolder reportResponse =
taskReportResponse(HttpResponseStatus.OK, reportString);
EasyMock.expect(httpClient.go(EasyMock.capture(capturedRequest),
EasyMock.anyObject()))
.andReturn(Futures.immediateFuture(reportResponse));
EasyMock.replay(httpClient);
@@ -1206,6 +1209,43 @@ public class RemoteTaskRunnerTest
);
}
+ @Test
+ public void testStreamTaskReportsUnavailableFromWorker() throws Exception
+ {
+ doSetup();
+ final Capture<Request> capturedRequest = Capture.newInstance();
+ final InputStreamFullResponseHolder reportResponse = taskReportResponse(
+ HttpResponseStatus.SERVICE_UNAVAILABLE,
+ "{\"error\":\"Can't find chatHandler for handler[task]\"}"
+ );
+ EasyMock.expect(httpClient.go(EasyMock.capture(capturedRequest),
EasyMock.anyObject()))
+ .andReturn(Futures.immediateFuture(reportResponse));
+ EasyMock.replay(httpClient);
+
+ remoteTaskRunner.run(task);
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+
+ // Wait for the task to have a known location.
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ () ->
+ !remoteTaskRunner.getRunningTasks().isEmpty()
+ &&
!Iterables.getOnlyElement(remoteTaskRunner.getRunningTasks())
+ .getLocation()
+ .equals(TaskLocation.unknown())
+ )
+ );
+
+ Assert.assertEquals(Optional.absent(),
remoteTaskRunner.streamTaskReports(task.getId()));
+
+ EasyMock.verify(httpClient);
+ Assert.assertEquals(
+
"http://dummy:9000/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports",
+ capturedRequest.getValue().getUrl().toString()
+ );
+ }
+
@Test
public void testBuildPublishAction()
{
@@ -1247,4 +1287,17 @@ public class RemoteTaskRunnerTest
).getClass()
);
}
+
+ private static InputStreamFullResponseHolder taskReportResponse(
+ final HttpResponseStatus status,
+ final String content
+ )
+ {
+ final InputStreamFullResponseHolder response = new
InputStreamFullResponseHolder(
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+ );
+ response.addChunk(StringUtils.toUtf8(content));
+ response.done();
+ return response;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
index 820b2e893cf..0bc4176facd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
@@ -19,12 +19,27 @@
package org.apache.druid.indexing.overlord;
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.net.URL;
public class TaskRunnerUtilsTest
@@ -52,4 +67,89 @@ public class TaskRunnerUtilsTest
);
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log",
url.toString());
}
+
+ @Test
+ public void testStreamTaskReportsFromTaskLocationOk() throws Exception
+ {
+ final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+ final String report = "my report";
+ EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class),
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(response(HttpResponseStatus.OK,
report)));
+ EasyMock.replay(httpClient);
+
+ final Optional<InputStream> stream =
TaskRunnerUtils.streamTaskReportsFromTaskLocation(
+ httpClient,
+ url("http://example.com/liveReports")
+ );
+
+ Assert.assertTrue(stream.isPresent());
+ Assert.assertEquals(report,
StringUtils.fromUtf8(ByteStreams.toByteArray(stream.get())));
+ EasyMock.verify(httpClient);
+ }
+
+ @Test
+ public void testStreamTaskReportsFromTaskLocationNotFound() throws Exception
+ {
+
assertStreamTaskReportsFromTaskLocationUnavailable(HttpResponseStatus.NOT_FOUND);
+ }
+
+ @Test
+ public void testStreamTaskReportsFromTaskLocationServiceUnavailable() throws
Exception
+ {
+
assertStreamTaskReportsFromTaskLocationUnavailable(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ }
+
+ @Test
+ public void testStreamTaskReportsFromTaskLocationUnexpectedStatus() throws
Exception
+ {
+ final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+ EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class),
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+
.andReturn(Futures.immediateFuture(response(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"error")));
+ EasyMock.replay(httpClient);
+
+ final IOException e = Assert.assertThrows(
+ IOException.class,
+ () -> TaskRunnerUtils.streamTaskReportsFromTaskLocation(
+ httpClient,
+ url("http://example.com/liveReports")
+ )
+ );
+
+ Assert.assertTrue(e.getMessage().contains("500 Internal Server Error"));
+ EasyMock.verify(httpClient);
+ }
+
+ private static void assertStreamTaskReportsFromTaskLocationUnavailable(
+ final HttpResponseStatus status
+ ) throws Exception
+ {
+ final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+ EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class),
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(response(status, "error")));
+ EasyMock.replay(httpClient);
+
+ Assert.assertEquals(
+ Optional.absent(),
+ TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient,
url("http://example.com/liveReports"))
+ );
+ EasyMock.verify(httpClient);
+ }
+
+ private static URL url(final String value) throws IOException
+ {
+ return URI.create(value).toURL();
+ }
+
+ private static InputStreamFullResponseHolder response(
+ final HttpResponseStatus status,
+ final String content
+ )
+ {
+ final InputStreamFullResponseHolder response = new
InputStreamFullResponseHolder(
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+ );
+ response.addChunk(StringUtils.toUtf8(content));
+ response.done();
+ return response;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 3a3dcc3a7a1..8e4d1d3aacd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.concurrent.LifecycleLock;
@@ -37,6 +39,7 @@ import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
@@ -58,18 +61,26 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1185,6 +1196,18 @@ public class HttpRemoteTaskRunnerTest
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
}
+ @Test(timeout = 60_000L)
+ public void testStreamTaskReportsFromWorker() throws Exception
+ {
+ assertStreamTaskReportsFromWorker(HttpResponseStatus.OK, Optional.of("my
report!"));
+ }
+
+ @Test(timeout = 60_000L)
+ public void testStreamTaskReportsUnavailableFromWorker() throws Exception
+ {
+ assertStreamTaskReportsFromWorker(HttpResponseStatus.SERVICE_UNAVAILABLE,
Optional.absent());
+ }
+
/*
* Task goes PENDING -> RUNNING -> SUCCESS and few more useless
notifications in between.
*/
@@ -1939,6 +1962,107 @@ public class HttpRemoteTaskRunnerTest
return workerHolder;
}
+ private static void assertStreamTaskReportsFromWorker(
+ final HttpResponseStatus responseStatus,
+ final Optional<String> expectedReport
+ ) throws Exception
+ {
+ final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+ final Capture<Request> capturedRequest = Capture.newInstance();
+ EasyMock.expect(httpClient.go(
+ EasyMock.capture(capturedRequest),
+ EasyMock.anyObject(InputStreamFullResponseHandler.class))
+ ).andReturn(Futures.immediateFuture(taskReportResponse(responseStatus,
expectedReport.or("error"))));
+ EasyMock.replay(httpClient);
+
+ final TestDruidNodeDiscovery druidNodeDiscovery = new
TestDruidNodeDiscovery();
+ final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery)
+ .times(2);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ final Task task = new NoopTask("task id with spaces", null, null, 0, 0,
null);
+ final HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance(
+ druidNodeDiscoveryProvider,
+ new NoopProvisioningStrategy<>(),
+ httpClient,
+ ImmutableMap.of(
+ task,
+ ImmutableList.of(
+ TaskAnnouncement.create(
+ task,
+ TaskStatus.running(task.getId()),
+ TaskLocation.unknown()
+ ),
+ TaskAnnouncement.create(
+ task,
+ TaskStatus.running(task.getId()),
+ TaskLocation.create("host", 1234, -1)
+ )
+ )
+ )
+ );
+
+ try {
+ taskRunner.start();
+ druidNodeDiscovery.getListeners().get(0).nodesAdded(
+ ImmutableList.of(
+ new DiscoveryDruidNode(
+ new DruidNode("service", "host", false, 1234, null, true,
false),
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY,
+ new WorkerNodeService("ip1", 1, "0",
WorkerConfig.DEFAULT_CATEGORY)
+ )
+ )
+ )
+ );
+ taskRunner.run(task);
+
+ Assert.assertTrue(
+ TestUtils.conditionValid(
+ () ->
+ !taskRunner.getRunningTasks().isEmpty()
+ && !Iterables.getOnlyElement(taskRunner.getRunningTasks())
+ .getLocation()
+ .equals(TaskLocation.unknown())
+ )
+ );
+
+ final Optional<InputStream> stream =
taskRunner.streamTaskReports(task.getId());
+ if (expectedReport.isPresent()) {
+ Assert.assertTrue(stream.isPresent());
+ Assert.assertEquals(expectedReport.get(),
StringUtils.fromUtf8(ByteStreams.toByteArray(stream.get())));
+ } else {
+ Assert.assertEquals(Optional.absent(), stream);
+ }
+
+ Assert.assertEquals(
+
"http://host:1234/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports",
+ capturedRequest.getValue().getUrl().toString()
+ );
+ }
+ finally {
+ taskRunner.stop();
+ }
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ private static InputStreamFullResponseHolder taskReportResponse(
+ final HttpResponseStatus status,
+ final String content
+ )
+ {
+ final InputStreamFullResponseHolder response = new
InputStreamFullResponseHolder(
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+ );
+ response.addChunk(StringUtils.toUtf8(content));
+ response.done();
+ return response;
+ }
+
private static WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
@@ -2171,6 +2295,20 @@ public class HttpRemoteTaskRunnerTest
private static HttpRemoteTaskRunner newHttpTaskRunnerInstance(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
ProvisioningStrategy provisioningStrategy)
+ {
+ return newHttpTaskRunnerInstance(
+ druidNodeDiscoveryProvider,
+ provisioningStrategy,
+ EasyMock.createNiceMock(HttpClient.class),
+ ImmutableMap.of()
+ );
+ }
+
+ private static HttpRemoteTaskRunner newHttpTaskRunnerInstance(
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+ ProvisioningStrategy provisioningStrategy,
+ HttpClient httpClient,
+ Map<Task, List<TaskAnnouncement>> toBeAssignedTasks)
{
return new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
@@ -2182,7 +2320,7 @@ public class HttpRemoteTaskRunnerTest
return 3;
}
},
- EasyMock.createNiceMock(HttpClient.class),
+ httpClient,
DSuppliers.of(new
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
provisioningStrategy,
druidNodeDiscoveryProvider,
@@ -2212,7 +2350,7 @@ public class HttpRemoteTaskRunnerTest
worker,
ImmutableList.of(),
ImmutableList.of(),
- ImmutableMap.of(),
+ toBeAssignedTasks,
new AtomicInteger(),
ImmutableSet.of()
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]