This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e5e31b7d962 fix: chat handler response parsing (#19495)
e5e31b7d962 is described below

commit e5e31b7d96282f7f448497fd9b164aa5ac644934
Author: jtuglu1 <[email protected]>
AuthorDate: Thu May 21 21:52:11 2026 -0700

    fix: chat handler response parsing (#19495)
    
    Overlord task report requests can sometimes be too eager during
    ingestion and ping a task before the http server servicing the chat
    requests has spun up. This causes 4xx/5xx to be returned, which
    are not correctly parsed by the chat client. While this doesn't
    explicitly fail the ingestion, it spams the logs and causes confusion.
---
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  19 +--
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |  76 ++++++++---
 .../druid/indexing/overlord/RemoteTaskRunner.java  |  15 +--
 .../druid/indexing/overlord/TaskRunnerUtils.java   |  43 +++++++
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  15 +--
 .../indexing/overlord/RemoteTaskRunnerTest.java    |  57 ++++++++-
 .../indexing/overlord/TaskRunnerUtilsTest.java     | 100 +++++++++++++++
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    | 142 ++++++++++++++++++++-
 8 files changed, 401 insertions(+), 66 deletions(-)

diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 947815a6782..fa7148a6151 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -21,7 +21,6 @@ package org.apache.druid.k8s.overlord;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -53,15 +52,12 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import 
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
 import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.TaskLogStreamer;
-import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
@@ -389,20 +385,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         taskid
     );
 
-    try {
-      return Optional.of(httpClient.go(
-          new Request(HttpMethod.GET, url),
-          new InputStreamResponseHandler()
-      ).get());
-    }
-    catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (ExecutionException e) {
-      // Unwrap if possible
-      Throwables.propagateIfPossible(e.getCause(), IOException.class);
-      throw new RuntimeException(e);
-    }
+    return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, url);
   }
 
   @Override
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 426feb33aed..edf77961b8c 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -38,7 +38,8 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
 import org.apache.druid.k8s.overlord.common.K8sTestUtils;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import 
org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig;
@@ -50,6 +51,9 @@ import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -586,7 +590,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void test_streamTaskReports_withExistingTask() throws Exception
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
+    final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -598,12 +602,12 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     EasyMock.expect(httpClient.go(
         EasyMock.anyObject(Request.class),
-        EasyMock.anyObject(InputStreamResponseHandler.class))
-    ).andReturn(Futures.immediateFuture(IOUtils.toInputStream("{}", 
StandardCharsets.UTF_8)));
+        EasyMock.anyObject(InputStreamFullResponseHandler.class))
+    
).andReturn(Futures.immediateFuture(taskReportResponse(HttpResponseStatus.OK, 
"{}")));
 
     replayAll();
 
-    Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
+    final Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
 
     verifyAll();
 
@@ -611,17 +615,44 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assertions.assertEquals("{}", IOUtils.toString(maybeInputStream.get(), 
StandardCharsets.UTF_8));
   }
 
+  @Test
+  public void 
test_streamTaskReports_whenTaskReportsUnavailable_returnsEmptyOptional() throws 
Exception
+  {
+    final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
+      @Override
+      public TaskLocation getLocation()
+      {
+        return TaskLocation.create("host", 0, 1, false);
+      }
+    };
+
+    runner.tasks.put(task.getId(), workItem);
+
+    EasyMock.expect(httpClient.go(
+        EasyMock.anyObject(Request.class),
+        EasyMock.anyObject(InputStreamFullResponseHandler.class))
+    
).andReturn(Futures.immediateFuture(taskReportResponse(HttpResponseStatus.SERVICE_UNAVAILABLE,
 "error")));
+
+    replayAll();
+
+    final Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
+
+    verifyAll();
+
+    Assertions.assertFalse(maybeInputStream.isPresent());
+  }
+
   @Test
   public void 
test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() throws 
Exception
   {
-    Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
+    final Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
     Assertions.assertFalse(maybeInputStream.isPresent());
   }
 
   @Test
   public void 
test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws 
Exception
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
+    final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -631,14 +662,14 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     runner.tasks.put(task.getId(), workItem);
 
-    Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
+    final Optional<InputStream> maybeInputStream = 
runner.streamTaskReports(task.getId());
     Assertions.assertFalse(maybeInputStream.isPresent());
   }
 
   @Test
   public void 
test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
+    final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -648,7 +679,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     runner.tasks.put(task.getId(), workItem);
 
-    ListenableFuture<InputStream> future = new ListenableFuture<>()
+    final ListenableFuture<InputStreamFullResponseHolder> future = new 
ListenableFuture<>()
     {
       @Override
       public void addListener(Runnable runnable, Executor executor)
@@ -674,13 +705,13 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
       }
 
       @Override
-      public InputStream get() throws InterruptedException
+      public InputStreamFullResponseHolder get() throws InterruptedException
       {
         throw new InterruptedException();
       }
 
       @Override
-      public InputStream get(long timeout, TimeUnit unit) throws 
InterruptedException
+      public InputStreamFullResponseHolder get(long timeout, TimeUnit unit) 
throws InterruptedException
       {
         throw new InterruptedException();
       }
@@ -688,12 +719,12 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     EasyMock.expect(httpClient.go(
         EasyMock.anyObject(Request.class),
-        EasyMock.anyObject(InputStreamResponseHandler.class))
+        EasyMock.anyObject(InputStreamFullResponseHandler.class))
     ).andReturn(future);
 
     replayAll();
 
-    Exception e = Assertions.assertThrows(RuntimeException.class, () -> 
runner.streamTaskReports(task.getId()));
+    final Exception e = Assertions.assertThrows(RuntimeException.class, () -> 
runner.streamTaskReports(task.getId()));
     Assertions.assertTrue(e.getCause() instanceof InterruptedException);
 
     verifyAll();
@@ -702,7 +733,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
   @Test
   public void 
test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException()
   {
-    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
+    final KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, 
kubernetesPeonLifecycle) {
       @Override
       public TaskLocation getLocation()
       {
@@ -714,7 +745,7 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
 
     EasyMock.expect(httpClient.go(
         EasyMock.anyObject(Request.class),
-        EasyMock.anyObject(InputStreamResponseHandler.class))
+        EasyMock.anyObject(InputStreamFullResponseHandler.class))
     ).andReturn(Futures.immediateFailedFuture(new Exception()));
 
     replayAll();
@@ -886,4 +917,17 @@ public class KubernetesTaskRunnerTest extends 
EasyMockSupport
     Assertions.assertEquals(1, executor.getMaximumPoolSize());
     Assertions.assertEquals(1, runner.getTotalCapacity());
   }
+
+  private static InputStreamFullResponseHolder taskReportResponse(
+      final HttpResponseStatus status,
+      final String content
+  )
+  {
+    final InputStreamFullResponseHolder response = new 
InputStreamFullResponseHolder(
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+    );
+    response.addChunk(content.getBytes(StandardCharsets.UTF_8));
+    response.done();
+    return response;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 4a4f0a08464..4018701d447 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -701,20 +701,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
         taskId
     );
 
-    try {
-      return Optional.of(httpClient.go(
-          new Request(HttpMethod.GET, url),
-          new InputStreamResponseHandler()
-      ).get());
-    }
-    catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (ExecutionException e) {
-      // Unwrap if possible
-      Throwables.propagateIfPossible(e.getCause(), IOException.class);
-      throw new RuntimeException(e);
-    }
+    return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, url);
   }
 
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
index e53ac175769..46d1b351217 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerUtils.java
@@ -19,19 +19,30 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 
 public class TaskRunnerUtils
@@ -111,4 +122,36 @@ public class TaskRunnerUtils
       throw new RuntimeException(e);
     }
   }
+
+  public static Optional<InputStream> streamTaskReportsFromTaskLocation(
+      final HttpClient httpClient,
+      final URL url
+  ) throws IOException
+  {
+    try {
+      final InputStreamFullResponseHolder response = httpClient.go(
+          new Request(HttpMethod.GET, url),
+          new InputStreamFullResponseHandler()
+      ).get();
+      final HttpResponseStatus responseStatus = 
response.getResponse().getStatus();
+
+      if (HttpResponseStatus.OK.equals(responseStatus)) {
+        return Optional.of(response.getContent());
+      } else if (HttpResponseStatus.NOT_FOUND.equals(responseStatus)
+                 || 
HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus)) {
+        return Optional.absent();
+      } else {
+        throw new IOException(
+            StringUtils.format("Failed to stream task reports from [%s]. 
Response status [%s].", url, responseStatus)
+        );
+      }
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), IOException.class);
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 62f6b499cb3..53bd0dbbf94 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1074,20 +1074,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer,
           taskId
       );
 
-      try {
-        return Optional.of(httpClient.go(
-            new Request(HttpMethod.GET, url),
-            new InputStreamResponseHandler()
-        ).get());
-      }
-      catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-      catch (ExecutionException e) {
-        // Unwrap if possible
-        Throwables.propagateIfPossible(e.getCause(), IOException.class);
-        throw new RuntimeException(e);
-      }
+      return TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, 
url);
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index d0c2b29f7e6..2a94d4f21ed 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -59,11 +59,15 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.testing.DeadlockDetectingTimeout;
 import org.apache.zookeeper.Watcher;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
@@ -75,7 +79,6 @@ import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 import org.mockito.Mockito;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -1167,7 +1170,7 @@ public class RemoteTaskRunnerTest
     doSetup();
     final Capture<Request> capturedRequest = Capture.newInstance();
     final String reportString = "my report!";
-    final ByteArrayInputStream reportResponse = new 
ByteArrayInputStream(StringUtils.toUtf8(reportString));
+    final InputStreamFullResponseHolder reportResponse = 
taskReportResponse(HttpResponseStatus.OK, reportString);
     EasyMock.expect(httpClient.go(EasyMock.capture(capturedRequest), 
EasyMock.anyObject()))
             .andReturn(Futures.immediateFuture(reportResponse));
     EasyMock.replay(httpClient);
@@ -1206,6 +1209,43 @@ public class RemoteTaskRunnerTest
     );
   }
 
+  @Test
+  public void testStreamTaskReportsUnavailableFromWorker() throws Exception
+  {
+    doSetup();
+    final Capture<Request> capturedRequest = Capture.newInstance();
+    final InputStreamFullResponseHolder reportResponse = taskReportResponse(
+        HttpResponseStatus.SERVICE_UNAVAILABLE,
+        "{\"error\":\"Can't find chatHandler for handler[task]\"}"
+    );
+    EasyMock.expect(httpClient.go(EasyMock.capture(capturedRequest), 
EasyMock.anyObject()))
+            .andReturn(Futures.immediateFuture(reportResponse));
+    EasyMock.replay(httpClient);
+
+    remoteTaskRunner.run(task);
+    Assert.assertTrue(taskAnnounced(task.getId()));
+    mockWorkerRunningTask(task);
+
+    // Wait for the task to have a known location.
+    Assert.assertTrue(
+        TestUtils.conditionValid(
+            () ->
+                !remoteTaskRunner.getRunningTasks().isEmpty()
+                && 
!Iterables.getOnlyElement(remoteTaskRunner.getRunningTasks())
+                             .getLocation()
+                             .equals(TaskLocation.unknown())
+        )
+    );
+
+    Assert.assertEquals(Optional.absent(), 
remoteTaskRunner.streamTaskReports(task.getId()));
+
+    EasyMock.verify(httpClient);
+    Assert.assertEquals(
+        
"http://dummy:9000/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports";,
+        capturedRequest.getValue().getUrl().toString()
+    );
+  }
+
   @Test
   public void testBuildPublishAction()
   {
@@ -1247,4 +1287,17 @@ public class RemoteTaskRunnerTest
         ).getClass()
     );
   }
+
+  private static InputStreamFullResponseHolder taskReportResponse(
+      final HttpResponseStatus status,
+      final String content
+  )
+  {
+    final InputStreamFullResponseHolder response = new 
InputStreamFullResponseHolder(
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+    );
+    response.addChunk(StringUtils.toUtf8(content));
+    response.done();
+    return response;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
index 820b2e893cf..0bc4176facd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java
@@ -19,12 +19,27 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
 import java.net.URL;
 
 public class TaskRunnerUtilsTest
@@ -52,4 +67,89 @@ public class TaskRunnerUtilsTest
     );
     
Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log";,
 url.toString());
   }
+
+  @Test
+  public void testStreamTaskReportsFromTaskLocationOk() throws Exception
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    final String report = "my report";
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class), 
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+            .andReturn(Futures.immediateFuture(response(HttpResponseStatus.OK, 
report)));
+    EasyMock.replay(httpClient);
+
+    final Optional<InputStream> stream = 
TaskRunnerUtils.streamTaskReportsFromTaskLocation(
+        httpClient,
+        url("http://example.com/liveReports";)
+    );
+
+    Assert.assertTrue(stream.isPresent());
+    Assert.assertEquals(report, 
StringUtils.fromUtf8(ByteStreams.toByteArray(stream.get())));
+    EasyMock.verify(httpClient);
+  }
+
+  @Test
+  public void testStreamTaskReportsFromTaskLocationNotFound() throws Exception
+  {
+    
assertStreamTaskReportsFromTaskLocationUnavailable(HttpResponseStatus.NOT_FOUND);
+  }
+
+  @Test
+  public void testStreamTaskReportsFromTaskLocationServiceUnavailable() throws 
Exception
+  {
+    
assertStreamTaskReportsFromTaskLocationUnavailable(HttpResponseStatus.SERVICE_UNAVAILABLE);
+  }
+
+  @Test
+  public void testStreamTaskReportsFromTaskLocationUnexpectedStatus() throws 
Exception
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class), 
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+            
.andReturn(Futures.immediateFuture(response(HttpResponseStatus.INTERNAL_SERVER_ERROR,
 "error")));
+    EasyMock.replay(httpClient);
+
+    final IOException e = Assert.assertThrows(
+        IOException.class,
+        () -> TaskRunnerUtils.streamTaskReportsFromTaskLocation(
+            httpClient,
+            url("http://example.com/liveReports";)
+        )
+    );
+
+    Assert.assertTrue(e.getMessage().contains("500 Internal Server Error"));
+    EasyMock.verify(httpClient);
+  }
+
+  private static void assertStreamTaskReportsFromTaskLocationUnavailable(
+      final HttpResponseStatus status
+  ) throws Exception
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(EasyMock.anyObject(Request.class), 
EasyMock.anyObject(InputStreamFullResponseHandler.class)))
+            .andReturn(Futures.immediateFuture(response(status, "error")));
+    EasyMock.replay(httpClient);
+
+    Assert.assertEquals(
+        Optional.absent(),
+        TaskRunnerUtils.streamTaskReportsFromTaskLocation(httpClient, 
url("http://example.com/liveReports";))
+    );
+    EasyMock.verify(httpClient);
+  }
+
+  private static URL url(final String value) throws IOException
+  {
+    return URI.create(value).toURL();
+  }
+
+  private static InputStreamFullResponseHolder response(
+      final HttpResponseStatus status,
+      final String content
+  )
+  {
+    final InputStreamFullResponseHolder response = new 
InputStreamFullResponseHolder(
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+    );
+    response.addChunk(StringUtils.toUtf8(content));
+    response.done();
+    return response;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 3a3dcc3a7a1..8e4d1d3aacd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.common.guava.DSuppliers;
 import org.apache.druid.concurrent.LifecycleLock;
@@ -37,6 +39,7 @@ import org.apache.druid.discovery.WorkerNodeService;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
@@ -58,18 +61,26 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHandler;
+import 
org.apache.druid.java.util.http.client.response.InputStreamFullResponseHolder;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1185,6 +1196,18 @@ public class HttpRemoteTaskRunnerTest
     
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
   }
 
+  @Test(timeout = 60_000L)
+  public void testStreamTaskReportsFromWorker() throws Exception
+  {
+    assertStreamTaskReportsFromWorker(HttpResponseStatus.OK, Optional.of("my 
report!"));
+  }
+
+  @Test(timeout = 60_000L)
+  public void testStreamTaskReportsUnavailableFromWorker() throws Exception
+  {
+    assertStreamTaskReportsFromWorker(HttpResponseStatus.SERVICE_UNAVAILABLE, 
Optional.absent());
+  }
+
   /*
    * Task goes PENDING -> RUNNING -> SUCCESS and few more useless 
notifications in between.
    */
@@ -1939,6 +1962,107 @@ public class HttpRemoteTaskRunnerTest
     return workerHolder;
   }
 
+  private static void assertStreamTaskReportsFromWorker(
+      final HttpResponseStatus responseStatus,
+      final Optional<String> expectedReport
+  ) throws Exception
+  {
+    final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+    final Capture<Request> capturedRequest = Capture.newInstance();
+    EasyMock.expect(httpClient.go(
+        EasyMock.capture(capturedRequest),
+        EasyMock.anyObject(InputStreamFullResponseHandler.class))
+    ).andReturn(Futures.immediateFuture(taskReportResponse(responseStatus, 
expectedReport.or("error"))));
+    EasyMock.replay(httpClient);
+
+    final TestDruidNodeDiscovery druidNodeDiscovery = new 
TestDruidNodeDiscovery();
+    final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery)
+            .times(2);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    final Task task = new NoopTask("task id with spaces", null, null, 0, 0, 
null);
+    final HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        new NoopProvisioningStrategy<>(),
+        httpClient,
+        ImmutableMap.of(
+            task,
+            ImmutableList.of(
+                TaskAnnouncement.create(
+                    task,
+                    TaskStatus.running(task.getId()),
+                    TaskLocation.unknown()
+                ),
+                TaskAnnouncement.create(
+                    task,
+                    TaskStatus.running(task.getId()),
+                    TaskLocation.create("host", 1234, -1)
+                )
+            )
+        )
+    );
+
+    try {
+      taskRunner.start();
+      druidNodeDiscovery.getListeners().get(0).nodesAdded(
+          ImmutableList.of(
+              new DiscoveryDruidNode(
+                  new DruidNode("service", "host", false, 1234, null, true, 
false),
+                  NodeRole.MIDDLE_MANAGER,
+                  ImmutableMap.of(
+                      WorkerNodeService.DISCOVERY_SERVICE_KEY,
+                      new WorkerNodeService("ip1", 1, "0", 
WorkerConfig.DEFAULT_CATEGORY)
+                  )
+              )
+          )
+      );
+      taskRunner.run(task);
+
+      Assert.assertTrue(
+          TestUtils.conditionValid(
+              () ->
+                  !taskRunner.getRunningTasks().isEmpty()
+                  && !Iterables.getOnlyElement(taskRunner.getRunningTasks())
+                               .getLocation()
+                               .equals(TaskLocation.unknown())
+          )
+      );
+
+      final Optional<InputStream> stream = 
taskRunner.streamTaskReports(task.getId());
+      if (expectedReport.isPresent()) {
+        Assert.assertTrue(stream.isPresent());
+        Assert.assertEquals(expectedReport.get(), 
StringUtils.fromUtf8(ByteStreams.toByteArray(stream.get())));
+      } else {
+        Assert.assertEquals(Optional.absent(), stream);
+      }
+
+      Assert.assertEquals(
+          
"http://host:1234/druid/worker/v1/chat/task%20id%20with%20spaces/liveReports";,
+          capturedRequest.getValue().getUrl().toString()
+      );
+    }
+    finally {
+      taskRunner.stop();
+    }
+
+    EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+  }
+
+  private static InputStreamFullResponseHolder taskReportResponse(
+      final HttpResponseStatus status,
+      final String content
+  )
+  {
+    final InputStreamFullResponseHolder response = new 
InputStreamFullResponseHolder(
+        new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
+    );
+    response.addChunk(StringUtils.toUtf8(content));
+    response.done();
+    return response;
+  }
+
   private static WorkerHolder createWorkerHolder(
       ObjectMapper smileMapper,
       HttpClient httpClient,
@@ -2171,6 +2295,20 @@ public class HttpRemoteTaskRunnerTest
   private static HttpRemoteTaskRunner newHttpTaskRunnerInstance(
       DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
       ProvisioningStrategy provisioningStrategy)
+  {
+    return newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        provisioningStrategy,
+        EasyMock.createNiceMock(HttpClient.class),
+        ImmutableMap.of()
+    );
+  }
+
+  private static HttpRemoteTaskRunner newHttpTaskRunnerInstance(
+      DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+      ProvisioningStrategy provisioningStrategy,
+      HttpClient httpClient,
+      Map<Task, List<TaskAnnouncement>> toBeAssignedTasks)
   {
     return new HttpRemoteTaskRunner(
         TestHelper.makeJsonMapper(),
@@ -2182,7 +2320,7 @@ public class HttpRemoteTaskRunnerTest
             return 3;
           }
         },
-        EasyMock.createNiceMock(HttpClient.class),
+        httpClient,
         DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
         provisioningStrategy,
         druidNodeDiscoveryProvider,
@@ -2212,7 +2350,7 @@ public class HttpRemoteTaskRunnerTest
             worker,
             ImmutableList.of(),
             ImmutableList.of(),
-            ImmutableMap.of(),
+            toBeAssignedTasks,
             new AtomicInteger(),
             ImmutableSet.of()
         );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to