This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d4cc501f7 return task status reported by peon (#14040)
9d4cc501f7 is described below

commit 9d4cc501f73a03543e19081c332331fccac67562
Author: Nicholas Lippis <[email protected]>
AuthorDate: Mon Apr 24 15:05:39 2023 -0400

    return task status reported by peon (#14040)
    
    * return task status reported by peon
    
    * Write TaskStatus to file in AbstractTask.cleanUp
    
    * Get TaskStatus from task log
    
    * Fix merge conflicts in AbstractTaskTest
    
    * Add unit tests for TaskLogPusher, TaskLogStreamer, NoopTaskLogs to 
satisfy code coverage
    
    * Add license headerss
    
    * Fix style
    
    * Remove unknown exception declarations
---
 .../druid/k8s/overlord/K8sOverlordModule.java      |   2 +
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  62 +++---
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  11 +-
 .../overlord/KubernetesTaskRunnerFactoryTest.java  |  22 +-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     | 227 ++++++++++++++++++---
 .../apache/druid/storage/azure/AzureTaskLogs.java  |  19 ++
 .../druid/storage/azure/AzureTaskLogsTest.java     |  95 +++++++++
 .../druid/storage/google/GoogleTaskLogs.java       |  20 ++
 .../druid/storage/google/GoogleTaskLogsTest.java   |  50 +++++
 .../druid/storage/hdfs/tasklog/HdfsTaskLogs.java   |  25 +++
 .../indexing/common/tasklogs/HdfsTaskLogsTest.java |  17 ++
 .../org/apache/druid/storage/s3/S3TaskLogs.java    |  15 ++
 .../apache/druid/storage/s3/S3TaskLogsTest.java    |  52 +++++
 .../druid/indexing/common/task/AbstractTask.java   |  77 ++++---
 .../indexing/common/tasklogs/FileTaskLogs.java     |  20 ++
 .../indexing/common/task/AbstractTaskTest.java     |  18 +-
 .../task/batch/parallel/TaskMonitorTest.java       |   2 +-
 .../indexing/common/tasklogs/FileTaskLogsTest.java |  23 +++
 .../overlord/SingleTaskBackgroundRunnerTest.java   |  14 +-
 .../druid/indexing/overlord/TaskQueueTest.java     |  12 +-
 .../org/apache/druid/tasklogs/NoopTaskLogs.java    |   6 +
 .../org/apache/druid/tasklogs/TaskLogPusher.java   |   4 +
 .../org/apache/druid/tasklogs/TaskLogStreamer.java |   5 +
 .../apache/druid/tasklogs/NoopTaskLogsTest.java}   |  17 +-
 .../apache/druid/tasklogs/TaskLogPusherTest.java}  |  25 ++-
 .../druid/tasklogs/TaskLogStreamerTest.java}       |  29 +--
 26 files changed, 720 insertions(+), 149 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
index c176330b15..b68b4d07a2 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java
@@ -37,6 +37,7 @@ import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.tasklogs.TaskLogKiller;
 import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.apache.druid.tasklogs.TaskLogs;
 
 @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
@@ -78,6 +79,7 @@ public class K8sOverlordModule implements DruidModule
     binder.bind(FileTaskLogs.class).in(LazySingleton.class);
 
     binder.bind(TaskLogPusher.class).to(TaskLogs.class);
+    binder.bind(TaskLogStreamer.class).to(TaskLogs.class);
     binder.bind(TaskLogKiller.class).to(TaskLogs.class);
   }
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 2569b40b2a..a360dc09f0 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.k8s.overlord;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -31,6 +32,7 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.netty.util.SuppressForbidden;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -44,6 +46,7 @@ import 
org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
@@ -57,8 +60,8 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import 
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
 import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.k8s.overlord.common.TaskAdapter;
-import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.apache.druid.tasklogs.TaskLogs;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 
@@ -66,6 +69,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -73,6 +77,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -101,28 +106,31 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
 
   protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new 
ConcurrentHashMap<>();
   protected final TaskAdapter adapter;
+  protected final KubernetesPeonClient client;
 
+  private final ObjectMapper mapper;
   private final KubernetesTaskRunnerConfig k8sConfig;
   private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogPusher taskLogPusher;
+  private final TaskLogs taskLogs;
   private final ListeningExecutorService exec;
-  private final KubernetesPeonClient client;
   private final HttpClient httpClient;
 
 
   public KubernetesTaskRunner(
+      ObjectMapper mapper,
       TaskAdapter adapter,
       KubernetesTaskRunnerConfig k8sConfig,
       TaskQueueConfig taskQueueConfig,
-      TaskLogPusher taskLogPusher,
+      TaskLogs taskLogs,
       KubernetesPeonClient client,
       HttpClient httpClient
   )
   {
+    this.mapper = mapper;
     this.adapter = adapter;
     this.k8sConfig = k8sConfig;
     this.taskQueueConfig = taskQueueConfig;
-    this.taskLogPusher = taskLogPusher;
+    this.taskLogs = taskLogs;
     this.client = client;
     this.httpClient = httpClient;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
@@ -178,20 +186,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
                       completedPhase = monitorJob(k8sTaskId);
                     }
                   }
-                  TaskStatus status;
-                  if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) {
-                    status = TaskStatus.success(task.getId());
-                  } else if (completedPhase.getJob() == null) {
-                    status = TaskStatus.failure(
-                        task.getId(),
-                        "K8s Job for task disappeared before completion: " + 
k8sTaskId
-                    );
-                  } else {
-                    status = TaskStatus.failure(
-                        task.getId(),
-                        "Task failed: " + k8sTaskId
-                    );
-                  }
+                  TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
                   if (completedPhase.getJobDuration().isPresent()) {
                     status = 
status.withDuration(completedPhase.getJobDuration().get());
                   }
@@ -210,7 +205,7 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
                     if (logStream.isPresent()) {
                       FileUtils.copyInputStreamToFile(logStream.get(), 
log.toFile());
                     }
-                    taskLogPusher.pushTaskLog(task.getId(), log.toFile());
+                    taskLogs.pushTaskLog(task.getId(), log.toFile());
                   }
                   finally {
                     Files.deleteIfExists(log);
@@ -243,10 +238,31 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
     );
   }
 
+  private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) 
throws IOException
+  {
+    Optional<InputStream> maybeTaskStatusStream = 
taskLogs.streamTaskStatus(task.getOriginalTaskId());
+    if (maybeTaskStatusStream.isPresent()) {
+      String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), 
StandardCharsets.UTF_8);
+      return mapper.readValue(taskStatus, TaskStatus.class);
+    } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
+      // fallback to behavior before the introduction of task status streaming 
for backwards compatibility
+      return TaskStatus.success(task.getOriginalTaskId());
+    } else if (Objects.isNull(jobResponse.getJob())) {
+      return TaskStatus.failure(
+          task.getOriginalTaskId(),
+          StringUtils.format("Task [%s] failed kubernetes job disappeared 
before completion", task.getOriginalTaskId())
+      );
+    } else {
+      return TaskStatus.failure(
+          task.getOriginalTaskId(),
+          StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
+      );
+    }
+  }
+
   @Override
   public void updateStatus(Task task, TaskStatus status)
   {
-    log.info("Updating task: %s with status %s", task.getId(), status);
     TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
   }
 
@@ -508,8 +524,8 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
         }
         boolean tlsEnabled = Boolean.parseBoolean(
             mainPod.getMetadata()
-                   .getAnnotations()
-                   .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
+                .getAnnotations()
+                .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"));
         return TaskLocation.create(
             mainPod.getStatus().getPodIP(),
             DruidK8sConstants.PORT,
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 4b9873499c..fdee5702c5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -41,7 +41,7 @@ import 
org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
 import org.apache.druid.k8s.overlord.common.TaskAdapter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
 
 import java.util.Locale;
 import java.util.Properties;
@@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
   private final KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
   private final StartupLoggingConfig startupLoggingConfig;
   private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogPusher taskLogPusher;
+  private final TaskLogs taskLogs;
   private final DruidNode druidNode;
   private final TaskConfig taskConfig;
   private final Properties properties;
@@ -68,7 +68,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
       KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
       StartupLoggingConfig startupLoggingConfig,
       @JacksonInject TaskQueueConfig taskQueueConfig,
-      TaskLogPusher taskLogPusher,
+      TaskLogs taskLogs,
       @Self DruidNode druidNode,
       TaskConfig taskConfig,
       Properties properties
@@ -80,7 +80,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
     this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
     this.startupLoggingConfig = startupLoggingConfig;
     this.taskQueueConfig = taskQueueConfig;
-    this.taskLogPusher = taskLogPusher;
+    this.taskLogs = taskLogs;
     this.druidNode = druidNode;
     this.taskConfig = taskConfig;
     this.properties = properties;
@@ -100,10 +100,11 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
     }
 
     runner = new KubernetesTaskRunner(
+        smileMapper,
         buildTaskAdapter(client),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         new DruidKubernetesPeonClient(client, 
kubernetesTaskRunnerConfig.namespace, kubernetesTaskRunnerConfig.debugJobs),
         httpClient
     );
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index d00c766db8..8badaafd3a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -31,7 +31,7 @@ import 
org.apache.druid.k8s.overlord.common.SingleContainerTaskAdapter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.NoopTaskLogs;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +45,7 @@ public class KubernetesTaskRunnerFactoryTest
   private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
   private StartupLoggingConfig startupLoggingConfig;
   private TaskQueueConfig taskQueueConfig;
-  private TaskLogPusher taskLogPusher;
+  private TaskLogs taskLogs;
   private DruidNode druidNode;
   private TaskConfig taskConfig;
   private Properties properties;
@@ -62,7 +62,7 @@ public class KubernetesTaskRunnerFactoryTest
         null,
         null
     );
-    taskLogPusher = new NoopTaskLogs();
+    taskLogs = new NoopTaskLogs();
     druidNode = new DruidNode(
         "test",
         "",
@@ -85,7 +85,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         properties
@@ -106,7 +106,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         properties
@@ -129,7 +129,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         properties
@@ -153,7 +153,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         props
@@ -179,7 +179,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         props
@@ -206,7 +206,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         props
@@ -230,7 +230,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         props
@@ -257,7 +257,7 @@ public class KubernetesTaskRunnerFactoryTest
         kubernetesTaskRunnerConfig,
         startupLoggingConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         druidNode,
         taskConfig,
         props
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 6c651f0cbd..c31a2a42f8 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -62,7 +63,7 @@ import 
org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
 import org.apache.druid.k8s.overlord.common.PeonPhase;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogs;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,7 +97,7 @@ public class KubernetesTaskRunnerTest
   private StartupLoggingConfig startupLoggingConfig;
   private ObjectMapper jsonMapper;
   private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig;
-  private TaskLogPusher taskLogPusher;
+  private TaskLogs taskLogs;
   private DruidNode druidNode;
 
   @Before
@@ -116,7 +117,7 @@ public class KubernetesTaskRunnerTest
     kubernetesTaskRunnerConfig.javaOptsArray = 
Collections.singletonList("-Xmx2g");
     taskQueueConfig = new TaskQueueConfig(1, Period.millis(1), 
Period.millis(1), Period.millis(1));
     startupLoggingConfig = new StartupLoggingConfig();
-    taskLogPusher = mock(TaskLogPusher.class);
+    taskLogs = mock(TaskLogs.class);
     druidNode = mock(DruidNode.class);
     when(druidNode.isEnableTlsPort()).thenReturn(false);
   }
@@ -156,22 +157,30 @@ public class KubernetesTaskRunnerTest
     when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
+    TaskStatus taskStatus = TaskStatus.success(task.getId());
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+        jsonMapper.writeValueAsString(taskStatus),
+        StandardCharsets.UTF_8))
+    );
+
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
 
     ListenableFuture<TaskStatus> future = spyRunner.run(task);
-    future.get();
+    TaskStatus actualTaskStatus = future.get();
+    Assert.assertEquals(taskStatus, actualTaskStatus);
     // we should never launch the job here, one exists
     verify(peonClient, never()).launchJobAndWaitForStart(isA(Job.class), 
anyLong(), isA(TimeUnit.class));
     verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
-    verify(spyRunner, times(1)).updateStatus(eq(task), 
eq(TaskStatus.success(task.getId())));
+    verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
   }
 
   @Test
@@ -208,22 +217,156 @@ public class KubernetesTaskRunnerTest
         job,
         PeonPhase.SUCCEEDED
     ));
+
+    TaskStatus taskStatus = TaskStatus.success(task.getId());
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+        jsonMapper.writeValueAsString(taskStatus),
+        StandardCharsets.UTF_8))
+    );
+
     when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
 
+    ListenableFuture<TaskStatus> future = spyRunner.run(task);
+    TaskStatus actualTaskStatus = future.get();
+    Assert.assertEquals(taskStatus, actualTaskStatus);
+    // we should never launch the job here, one exists
+    verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), 
anyLong(), isA(TimeUnit.class));
+    verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
+    verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus));
+  }
+
+  @Test
+  public void 
test_run_withSuccessfulJobAndWithoutStatusFile_returnsSucessfulTask() throws 
Exception
+  {
+    Task task = makeTask();
+    K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
+
+    Job job = mock(Job.class);
+    ObjectMeta jobMetadata = mock(ObjectMeta.class);
+    when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
+    JobStatus status = mock(JobStatus.class);
+    when(status.getActive()).thenReturn(1).thenReturn(null);
+    when(job.getStatus()).thenReturn(status);
+    when(job.getMetadata()).thenReturn(jobMetadata);
+
+    Pod peonPod = mock(Pod.class);
+    ObjectMeta metadata = mock(ObjectMeta.class);
+    when(metadata.getName()).thenReturn("peonPodName");
+    when(peonPod.getMetadata()).thenReturn(metadata);
+    PodStatus podStatus = mock(PodStatus.class);
+    when(podStatus.getPodIP()).thenReturn("SomeIP");
+    when(peonPod.getStatus()).thenReturn(podStatus);
+
+    K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
+
+    DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
+
+    
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
+    when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), 
isA(TimeUnit.class))).thenReturn(peonPod);
+    when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
+    when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), 
isA(TimeUnit.class))).thenReturn(new JobResponse(
+        job,
+        PeonPhase.SUCCEEDED
+    ));
+
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
+    when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
+    when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
+
+    KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
+        adapter,
+        kubernetesTaskRunnerConfig,
+        taskQueueConfig,
+        taskLogs,
+        peonClient,
+        null
+    );
+    KubernetesTaskRunner spyRunner = spy(taskRunner);
+
+    ListenableFuture<TaskStatus> future = spyRunner.run(task);
+    TaskStatus actualTaskStatus = future.get();
+    Assert.assertTrue(actualTaskStatus.isSuccess());
+
+    // we should never launch the job here, one exists
+    verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), 
anyLong(), isA(TimeUnit.class));
+    verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
+    verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
+  }
+
+  @Test
+  public void test_run_withFailedJob_returnsFailedTask() throws Exception
+  {
+    Task task = makeTask();
+    K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
+
+    Job job = mock(Job.class);
+    ObjectMeta jobMetadata = mock(ObjectMeta.class);
+    when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId());
+    JobStatus status = mock(JobStatus.class);
+    when(status.getActive()).thenReturn(1).thenReturn(null);
+    when(job.getStatus()).thenReturn(status);
+    when(job.getMetadata()).thenReturn(jobMetadata);
+
+    Pod peonPod = mock(Pod.class);
+    ObjectMeta metadata = mock(ObjectMeta.class);
+    when(metadata.getName()).thenReturn("peonPodName");
+    when(peonPod.getMetadata()).thenReturn(metadata);
+    PodStatus podStatus = mock(PodStatus.class);
+    when(podStatus.getPodIP()).thenReturn("SomeIP");
+    when(peonPod.getStatus()).thenReturn(podStatus);
+
+    K8sTaskAdapter adapter = mock(K8sTaskAdapter.class);
+    when(adapter.fromTask(eq(task))).thenReturn(job);
+
+    DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
+
+    
when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null));
+    when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), 
isA(TimeUnit.class))).thenReturn(peonPod);
+    when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
+    when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), 
isA(TimeUnit.class))).thenReturn(new JobResponse(
+        job,
+        PeonPhase.FAILED
+    ));
+
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
+    when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
+    when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
+
+    KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
+        adapter,
+        kubernetesTaskRunnerConfig,
+        taskQueueConfig,
+        taskLogs,
+        peonClient,
+        null
+    );
+    KubernetesTaskRunner spyRunner = spy(taskRunner);
 
     ListenableFuture<TaskStatus> future = spyRunner.run(task);
-    future.get();
+    TaskStatus actualTaskStatus = future.get();
+    Assert.assertTrue(actualTaskStatus.isFailure());
+    Assert.assertEquals(
+        StringUtils.format("Task [%s] failed", task.getId()),
+        actualTaskStatus.getErrorMsg()
+    );
+
     // we should never launch the job here, one exists
     verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), 
anyLong(), isA(TimeUnit.class));
     verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId));
@@ -233,7 +376,7 @@ public class KubernetesTaskRunnerTest
         DruidK8sConstants.TLS_PORT,
         druidNode.isEnableTlsPort()
     );
-    verify(spyRunner, times(1)).updateStatus(eq(task), 
eq(TaskStatus.success(task.getId(), expectedTaskLocation)));
+    verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus));
   }
 
   @Test
@@ -274,14 +417,22 @@ public class KubernetesTaskRunnerTest
         job,
         PeonPhase.SUCCEEDED
     ));
+
+    TaskStatus taskStatus = TaskStatus.success(task.getId());
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+        jsonMapper.writeValueAsString(taskStatus),
+        StandardCharsets.UTF_8))
+    );
+
     when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
@@ -337,14 +488,22 @@ public class KubernetesTaskRunnerTest
         job,
         PeonPhase.SUCCEEDED
     ));
+
+    TaskStatus taskStatus = TaskStatus.success(task.getId());
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream(
+        jsonMapper.writeValueAsString(taskStatus),
+        StandardCharsets.UTF_8))
+    );
+
     when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent());
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
@@ -411,10 +570,11 @@ public class KubernetesTaskRunnerTest
     );
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        adapter,
+        jsonMapper,
+        null,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         httpClient
     );
@@ -432,10 +592,11 @@ public class KubernetesTaskRunnerTest
     Task task = makeTask();
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         mock(K8sTaskAdapter.class),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         mock(DruidKubernetesPeonClient.class),
         mock(HttpClient.class)
     );
@@ -484,10 +645,11 @@ public class KubernetesTaskRunnerTest
     when(future.get()).thenThrow(InterruptedException.class);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        adapter,
+        jsonMapper,
+        null,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         httpClient
     );
@@ -539,10 +701,11 @@ public class KubernetesTaskRunnerTest
     when(future.get()).thenThrow(InterruptedException.class);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        adapter,
+        jsonMapper,
+        null,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         httpClient
     );
@@ -594,10 +757,11 @@ public class KubernetesTaskRunnerTest
     when(future.get()).thenThrow(ExecutionException.class);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
-        adapter,
+        jsonMapper,
+        null,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         httpClient
     );
@@ -619,10 +783,11 @@ public class KubernetesTaskRunnerTest
     when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         mock(K8sTaskAdapter.class),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
@@ -647,10 +812,11 @@ public class KubernetesTaskRunnerTest
         Period.millis(1)
     );
     assertThrows(IllegalArgumentException.class, () -> new 
KubernetesTaskRunner(
+        jsonMapper,
         mock(K8sTaskAdapter.class),
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         mock(DruidKubernetesPeonClient.class),
         null
     ));
@@ -724,6 +890,8 @@ public class KubernetesTaskRunnerTest
 
     DruidKubernetesPeonClient peonClient = 
mock(DruidKubernetesPeonClient.class);
 
+    
when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent());
+
     when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job));
     when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod);
 
@@ -736,18 +904,19 @@ public class KubernetesTaskRunnerTest
     when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true);
 
     KubernetesTaskRunner taskRunner = new KubernetesTaskRunner(
+        jsonMapper,
         adapter,
         kubernetesTaskRunnerConfig,
         taskQueueConfig,
-        taskLogPusher,
+        taskLogs,
         peonClient,
         null
     );
     KubernetesTaskRunner spyRunner = spy(taskRunner);
     ListenableFuture<TaskStatus> future = spyRunner.run(task);
-    TaskStatus taskStatus = future.get();
-    Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
-    Assert.assertEquals("K8s Job for task disappeared before completion: [ 
k8sTaskId, k8staskid]", taskStatus.getErrorMsg());
+    TaskStatus taskStatusResponse = future.get();
+    Assert.assertEquals(TaskState.FAILED, taskStatusResponse.getStatusCode());
+    Assert.assertEquals("Task [k8sTaskId] failed kubernetes job disappeared 
before completion", taskStatusResponse.getErrorMsg());
 
   }
 
@@ -762,9 +931,9 @@ public class KubernetesTaskRunnerTest
         null,
         null,
         ImmutableMap.of("druid.indexer.runner.javaOpts", "abc",
-                        
"druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048",
-                        "druid.peon.pod.cpu", "1",
-                        "druid.peon.pod.memory", "2G"
+            "druid.indexer.fork.property.druid.processing.buffer.sizeBytes", 
"2048",
+            "druid.peon.pod.cpu", "1",
+            "druid.peon.pod.memory", "2G"
         )
     );
   }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
index 678a43e5db..9bfda5ab34 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java
@@ -82,6 +82,14 @@ public class AzureTaskLogs implements TaskLogs
     pushTaskFile(reportFile, taskKey);
   }
 
+  @Override
+  public void pushTaskStatus(String taskid, File statusFile)
+  {
+    final String taskKey = getTaskStatusKey(taskid);
+    log.info("Pushing task status %s to: %s", statusFile, taskKey);
+    pushTaskFile(statusFile, taskKey);
+  }
+
   private void pushTaskFile(final File logFile, String taskKey)
   {
     try {
@@ -110,6 +118,12 @@ public class AzureTaskLogs implements TaskLogs
     return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
   }
 
+  @Override
+  public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
+  {
+    return streamTaskFile(taskid, 0, getTaskStatusKey(taskid));
+  }
+
   private Optional<InputStream> streamTaskFile(final String taskid, final long 
offset, String taskKey)
       throws IOException
   {
@@ -154,6 +168,11 @@ public class AzureTaskLogs implements TaskLogs
     return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid);
   }
 
+  private String getTaskStatusKey(String taskid)
+  {
+    return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid);
+  }
+
   @Override
   public void killAll() throws IOException
   {
diff --git 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
index 5313c1d5f5..297545e4ca 100644
--- 
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
+++ 
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java
@@ -155,6 +155,28 @@ public class AzureTaskLogsTest extends EasyMockSupport
     }
   }
 
+  @Test
+  public void test_PushTaskStatus_uploadsBlob() throws Exception
+  {
+    final File tmpDir = FileUtils.createTempDir();
+
+    try {
+      final File logFile = new File(tmpDir, "status.json");
+
+      azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + 
"/status.json");
+      EasyMock.expectLastCall();
+
+      replayAll();
+
+      azureTaskLogs.pushTaskStatus(TASK_ID, logFile);
+
+      verifyAll();
+    }
+    finally {
+      FileUtils.deleteDirectory(tmpDir);
+    }
+  }
+
   @Test(expected = RuntimeException.class)
   public void test_PushTaskReports_exception_rethrowsException() throws 
Exception
   {
@@ -323,6 +345,79 @@ public class AzureTaskLogsTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void test_streamTaskStatus_blobExists_succeeds() throws Exception
+  {
+    final String taskStatus = "{}";
+
+    final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
+    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andReturn(
+        new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8)));
+
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskStatus(TASK_ID);
+
+    final StringWriter writer = new StringWriter();
+    IOUtils.copy(stream.get(), writer, "UTF-8");
+    Assert.assertEquals(writer.toString(), taskStatus);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws 
Exception
+  {
+    final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json";
+    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(false);
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskStatus(TASK_ID_NOT_FOUND);
+
+
+    Assert.assertFalse(stream.isPresent());
+
+    verifyAll();
+  }
+
+  @Test(expected = IOException.class)
+  public void 
test_streamTaskStatus_exceptionWhenGettingStream_throwsException() throws 
Exception
+  {
+    final String taskStatus = "{}";
+
+    final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andReturn(true);
+    EasyMock.expect(azureStorage.getBlobLength(CONTAINER, 
blobPath)).andReturn((long) taskStatus.length());
+    EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, 
blobPath)).andThrow(
+        new URISyntaxException("", ""));
+
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
azureTaskLogs.streamTaskStatus(TASK_ID);
+
+    final StringWriter writer = new StringWriter();
+    IOUtils.copy(stream.get(), writer, "UTF-8");
+    verifyAll();
+  }
+
+  @Test(expected = IOException.class)
+  public void 
test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() 
throws Exception
+  {
+    final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
+    EasyMock.expect(azureStorage.getBlobExists(CONTAINER, 
blobPath)).andThrow(new URISyntaxException("", ""));
+
+    replayAll();
+
+    azureTaskLogs.streamTaskStatus(TASK_ID);
+
+    verifyAll();
+  }
+
   @Test
   public void test_killAll_noException_deletesAllTaskLogs() throws Exception
   {
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
index fcdee4039b..ae4024172a 100644
--- 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java
@@ -74,6 +74,14 @@ public class GoogleTaskLogs implements TaskLogs
     pushTaskFile(reportFile, taskKey);
   }
 
+  @Override
+  public void pushTaskStatus(String taskid, File statusFile) throws IOException
+  {
+    final String taskKey = getTaskStatusKey(taskid);
+    LOG.info("Pushing task status %s to: %s", statusFile, taskKey);
+    pushTaskFile(statusFile, taskKey);
+  }
+
   private void pushTaskFile(final File logFile, final String taskKey) throws 
IOException
   {
     try (final InputStream fileStream = 
Files.newInputStream(logFile.toPath())) {
@@ -115,6 +123,13 @@ public class GoogleTaskLogs implements TaskLogs
     return streamTaskFile(taskid, 0, taskKey);
   }
 
+  @Override
+  public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
+  {
+    final String taskKey = getTaskStatusKey(taskid);
+    return streamTaskFile(taskid, 0, taskKey);
+  }
+
   private Optional<InputStream> streamTaskFile(final String taskid, final long 
offset, String taskKey)
       throws IOException
   {
@@ -156,6 +171,11 @@ public class GoogleTaskLogs implements TaskLogs
     return config.getPrefix() + "/" + taskid.replace(':', '_') + 
".report.json";
   }
 
+  private String getTaskStatusKey(String taskid)
+  {
+    return config.getPrefix() + "/" + taskid.replace(':', '_') + 
".status.json";
+  }
+
   @Override
   public void killAll() throws IOException
   {
diff --git 
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
index d8b7c61cfc..9bfe2706f8 100644
--- 
a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
+++ 
b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java
@@ -109,6 +109,35 @@ public class GoogleTaskLogsTest extends EasyMockSupport
     }
   }
 
+  @Test
+  public void testPushTaskStatus() throws Exception
+  {
+    final File tmpDir = FileUtils.createTempDir();
+
+    try {
+      final File statusFile = new File(tmpDir, "status.json");
+      BufferedWriter output = Files.newBufferedWriter(statusFile.toPath(), 
StandardCharsets.UTF_8);
+      output.write("{}");
+      output.close();
+
+      storage.insert(
+          EasyMock.eq(BUCKET),
+          EasyMock.eq(PREFIX + "/" + TASKID),
+          EasyMock.anyObject(InputStreamContent.class)
+      );
+      EasyMock.expectLastCall();
+
+      replayAll();
+
+      googleTaskLogs.pushTaskLog(TASKID, statusFile);
+
+      verifyAll();
+    }
+    finally {
+      FileUtils.deleteDirectory(tmpDir);
+    }
+  }
+
   @Test
   public void testStreamTaskLogWithoutOffset() throws Exception
   {
@@ -177,6 +206,27 @@ public class GoogleTaskLogsTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testStreamTaskStatus() throws Exception
+  {
+    final String taskStatus = "{}";
+
+    final String logPath = PREFIX + "/" + TASKID + ".status.json";
+    EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true);
+    EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) 
taskStatus.length());
+    EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new 
ByteArrayInputStream(StringUtils.toUtf8(taskStatus)));
+
+    replayAll();
+
+    final Optional<InputStream> stream = 
googleTaskLogs.streamTaskStatus(TASKID);
+
+    final StringWriter writer = new StringWriter();
+    IOUtils.copy(stream.get(), writer, "UTF-8");
+    Assert.assertEquals(writer.toString(), taskStatus);
+
+    verifyAll();
+  }
+
   @Test
   public void test_killAll_noException_deletesAllTaskLogs() throws IOException
   {
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
index 6026a3e3cb..f89b0fafde 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java
@@ -75,6 +75,15 @@ public class HdfsTaskLogs implements TaskLogs
     log.info("Wrote task reports to: %s", path);
   }
 
+  @Override
+  public void pushTaskStatus(String taskId, File statusFile) throws IOException
+  {
+    final Path path = getTaskStatusFileFromId(taskId);
+    log.info("Writing task status to: %s", path);
+    pushTaskFile(path, statusFile);
+    log.info("Wrote task status to: %s", path);
+  }
+
   private void pushTaskFile(Path path, File logFile) throws IOException
   {
     final FileSystem fs = path.getFileSystem(hadoopConfig);
@@ -100,6 +109,13 @@ public class HdfsTaskLogs implements TaskLogs
     return streamTaskFile(path, 0);
   }
 
+  @Override
+  public Optional<InputStream> streamTaskStatus(String taskId) throws 
IOException
+  {
+    final Path path = getTaskStatusFileFromId(taskId);
+    return streamTaskFile(path, 0);
+  }
+
   private Optional<InputStream> streamTaskFile(final Path path, final long 
offset) throws IOException
   {
     final FileSystem fs = path.getFileSystem(hadoopConfig);
@@ -139,6 +155,15 @@ public class HdfsTaskLogs implements TaskLogs
     return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') 
+ ".reports.json"));
   }
 
+  /**
+   * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed 
in
+   * path names. So we format paths differently for HDFS.
+   */
+  private Path getTaskStatusFileFromId(String taskId)
+  {
+    return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') 
+ ".status.json"));
+  }
+
   // some hadoop version Path.mergePaths does not exist
   private static String mergePaths(String path1, String path2)
   {
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
index 9724cba694..9d0273d1a9 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java
@@ -78,6 +78,23 @@ public class HdfsTaskLogsTest
     Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
   }
 
+  @Test
+  public void test_taskStatus() throws Exception
+  {
+    final File tmpDir = tempFolder.newFolder();
+    final File logDir = new File(tmpDir, "logs");
+    final File statusFile = new File(tmpDir, "status.json");
+    final TaskLogs taskLogs = new HdfsTaskLogs(new 
HdfsTaskLogsConfig(logDir.toString()), new Configuration());
+
+
+    Files.write("{}", statusFile, StandardCharsets.UTF_8);
+    taskLogs.pushTaskStatus("id", statusFile);
+    Assert.assertEquals(
+        "{}",
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus("id").get()))
+    );
+  }
+
   @Test
   public void testKill() throws Exception
   {
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
index 27544e1eec..6868ba3c99 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
@@ -77,6 +77,13 @@ public class S3TaskLogs implements TaskLogs
     return streamTaskFile(0, taskKey);
   }
 
+  @Override
+  public Optional<InputStream> streamTaskStatus(String taskid) throws 
IOException
+  {
+    final String taskKey = getTaskLogKey(taskid, "status.json");
+    return streamTaskFile(0, taskKey);
+  }
+
   private Optional<InputStream> streamTaskFile(final long offset, String 
taskKey) throws IOException
   {
     try {
@@ -141,6 +148,14 @@ public class S3TaskLogs implements TaskLogs
     pushTaskFile(reportFile, taskKey);
   }
 
+  @Override
+  public void pushTaskStatus(String taskid, File statusFile) throws IOException
+  {
+    final String taskKey = getTaskLogKey(taskid, "status.json");
+    log.info("Pushing task status %s to: %s", statusFile, taskKey);
+    pushTaskFile(statusFile, taskKey);
+  }
+
   private void pushTaskFile(final File logFile, String taskKey) throws 
IOException
   {
     try {
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
index 7b82ca102b..13a6e07cc5 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
@@ -76,6 +76,7 @@ public class S3TaskLogsTest extends EasyMockSupport
   private static final Exception NON_RECOVERABLE_EXCEPTION = new 
SdkClientException(new NullPointerException());
   private static final String LOG_CONTENTS = "log_contents";
   private static final String REPORT_CONTENTS = "report_contents";
+  private static final String STATUS_CONTENTS = "status_contents";
 
   @Mock
   private CurrentTimeMillisSupplier timeSupplier;
@@ -115,6 +116,31 @@ public class S3TaskLogsTest extends EasyMockSupport
     );
     Assert.assertEquals("The Grant should have full control permission", 
Permission.FullControl, grant.getPermission());
   }
+  
+  @Test
+  public void test_pushTaskStatus() throws IOException 
+  {
+    
EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class)))
+        .andReturn(new PutObjectResult())
+        .once();
+
+    EasyMock.replay(s3Client);
+
+    S3TaskLogsConfig config = new S3TaskLogsConfig();
+    config.setS3Bucket(TEST_BUCKET);
+    config.setDisableAcl(true);
+    
+    CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+    S3InputDataConfig inputDataConfig = new S3InputDataConfig();
+    S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, 
timeSupplier);
+
+    String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+    File logFile = tempFolder.newFile("status.json");
+
+    s3TaskLogs.pushTaskLog(taskId, logFile);
+
+    EasyMock.verify(s3Client);
+  }
 
   @Test
   public void test_killAll_noException_deletesAllTaskLogs() throws IOException
@@ -434,6 +460,32 @@ public class S3TaskLogsTest extends EasyMockSupport
     Assert.assertEquals(REPORT_CONTENTS, report);
   }
 
+  @Test
+  public void test_status_fetch() throws IOException
+  {
+    EasyMock.reset(s3Client);
+    String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json";
+    ObjectMetadata objectMetadata = new ObjectMetadata();
+    objectMetadata.setContentLength(STATUS_CONTENTS.length());
+    EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, 
logPath)).andReturn(objectMetadata);
+    S3Object s3Object = new S3Object();
+    s3Object.setObjectContent(new 
ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8)));
+    GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, 
logPath);
+    getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1);
+    getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
+    EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
+    EasyMock.replay(s3Client);
+
+    S3TaskLogs s3TaskLogs = getS3TaskLogs();
+
+    Optional<InputStream> inputStreamOptional = 
s3TaskLogs.streamTaskStatus(KEY_1);
+    String report = new BufferedReader(
+        new InputStreamReader(inputStreamOptional.get(), 
StandardCharsets.UTF_8))
+        .lines()
+        .collect(Collectors.joining("\n"));
+
+    Assert.assertEquals(STATUS_CONTENTS, report);
+  }
 
   @Nonnull
   private S3TaskLogs getS3TaskLogs()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 72bff3784b..3ad7b45285 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
@@ -94,7 +95,9 @@ public abstract class AbstractTask implements Task
   private final String dataSource;
 
   private final Map<String, Object> context;
+
   private File reportsFile;
+  private File statusFile;
 
   private final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
 
@@ -147,6 +150,7 @@ public abstract class AbstractTask implements Task
       File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", 
toolbox.getAttemptId()).toFile();
       FileUtils.mkdirp(attemptDir);
       reportsFile = new File(attemptDir, "report.json");
+      statusFile = new File(attemptDir, "status.json");
       InetAddress hostName = InetAddress.getLocalHost();
       DruidNode node = toolbox.getTaskExecutorNode();
       toolbox.getTaskActionClient().submit(new 
UpdateLocationAction(TaskLocation.create(
@@ -160,48 +164,55 @@ public abstract class AbstractTask implements Task
   @Override
   public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
   {
-    boolean failure = false;
+    TaskStatus taskStatus = TaskStatus.running(getId());
     try {
       String errorMessage = setup(taskToolbox);
       if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
         return TaskStatus.failure(getId(), errorMessage);
       }
-      TaskStatus taskStatus = runTask(taskToolbox);
-      if (taskStatus.isFailure()) {
-        failure = true;
-      }
+      taskStatus = runTask(taskToolbox);
       return taskStatus;
     }
     catch (Exception e) {
-      failure = true;
+      taskStatus = TaskStatus.failure(getId(), e.toString());
       throw e;
     }
     finally {
-      cleanUp(taskToolbox, failure);
+      cleanUp(taskToolbox, taskStatus);
     }
   }
 
   public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
 
-  public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception
+  public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws 
Exception
   {
-    if (toolbox.getConfig().isEncapsulatedTask()) {
-      // report back to the overlord
-      UpdateStatusAction status = new UpdateStatusAction("successful");
-      if (failure) {
-        status = new UpdateStatusAction("failure");
-      }
-      toolbox.getTaskActionClient().submit(status);
-      toolbox.getTaskActionClient().submit(new 
UpdateLocationAction(TaskLocation.unknown()));
-
-      if (reportsFile != null && reportsFile.exists()) {
-        toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
-        log.debug("Pushed task reports");
-      } else {
-        log.debug("No task reports file exists to push");
-      }
-    } else {
+    if (!toolbox.getConfig().isEncapsulatedTask()) {
       log.debug("Not pushing task logs and reports from task.");
+      return;
+    }
+
+    // report back to the overlord
+    UpdateStatusAction status = new UpdateStatusAction("successful");
+    if (taskStatus.isFailure()) {
+      status = new UpdateStatusAction("failure");
+    }
+    toolbox.getTaskActionClient().submit(status);
+    toolbox.getTaskActionClient().submit(new 
UpdateLocationAction(TaskLocation.unknown()));
+
+    if (reportsFile != null && reportsFile.exists()) {
+      toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
+      log.debug("Pushed task reports");
+    } else {
+      log.debug("No task reports file exists to push");
+    }
+
+    if (statusFile != null) {
+      toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
+      toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
+      Files.deleteIfExists(statusFile.toPath());
+      log.debug("Pushed task status");
+    } else {
+      log.debug("No task status file exists to push");
     }
   }
 
@@ -281,12 +292,12 @@ public abstract class AbstractTask implements Task
   public String toString()
   {
     return "AbstractTask{" +
-           "id='" + id + '\'' +
-           ", groupId='" + groupId + '\'' +
-           ", taskResource=" + taskResource +
-           ", dataSource='" + dataSource + '\'' +
-           ", context=" + context +
-           '}';
+        "id='" + id + '\'' +
+        ", groupId='" + groupId + '\'' +
+        ", taskResource=" + taskResource +
+        ", dataSource='" + dataSource + '\'' +
+        ", context=" + context +
+        '}';
   }
 
   public TaskStatus success()
@@ -372,8 +383,8 @@ public abstract class AbstractTask implements Task
   protected static IngestionMode computeBatchIngestionMode(@Nullable 
BatchIOConfig ioConfig)
   {
     final boolean isAppendToExisting = ioConfig == null
-                                       ? BatchIOConfig.DEFAULT_APPEND_EXISTING
-                                       : ioConfig.isAppendToExisting();
+        ? BatchIOConfig.DEFAULT_APPEND_EXISTING
+        : ioConfig.isAppendToExisting();
     final boolean isDropExisting = ioConfig == null ? 
BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting();
     return computeIngestionMode(isAppendToExisting, isDropExisting);
   }
@@ -388,7 +399,7 @@ public abstract class AbstractTask implements Task
       return IngestionMode.REPLACE_LEGACY;
     }
     throw new IAE("Cannot simultaneously replace and append to existing 
segments. "
-                  + "Either dropExisting or appendToExisting should be set to 
false");
+        + "Either dropExisting or appendToExisting should be set to false");
   }
 
   public void emitMetric(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
index 5d546da494..a8e5b17245 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java
@@ -65,6 +65,15 @@ public class FileTaskLogs implements TaskLogs
     log.info("Wrote task report to: %s", outputFile);
   }
 
+  @Override
+  public void pushTaskStatus(String taskid, File statusFile) throws IOException
+  {
+    FileUtils.mkdirp(config.getDirectory());
+    final File outputFile = fileForTask(taskid, statusFile.getName());
+    Files.copy(statusFile, outputFile);
+    log.info("Wrote task status to: %s", outputFile);
+  }
+
   @Override
   public Optional<InputStream> streamTaskLog(final String taskid, final long 
offset) throws IOException
   {
@@ -87,6 +96,17 @@ public class FileTaskLogs implements TaskLogs
     }
   }
 
+  @Override
+  public Optional<InputStream> streamTaskStatus(final String taskid) throws 
IOException
+  {
+    final File file = fileForTask(taskid, "status.json");
+    if (file.exists()) {
+      return Optional.of(LogUtils.streamFile(file, 0));
+    } else {
+      return Optional.absent();
+    }
+  }
+
   private File fileForTask(final String taskid, String filename)
   {
     return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, 
filename));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index ab0a06b812..cf03134c4a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -19,15 +19,18 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.UpdateStatusAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,10 +51,17 @@ import static org.mockito.Mockito.when;
 
 public class AbstractTaskTest
 {
+  private ObjectMapper objectMapper;
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  @Before
+  public void setup()
+  {
+    objectMapper = new TestUtils().getTestObjectMapper();
+  }
+
   @Test
   public void testSetupAndCleanupIsCalledWtihParameter() throws Exception
   {
@@ -73,6 +83,7 @@ public class AbstractTaskTest
     File folder = temporaryFolder.newFolder();
     when(config.getTaskDir(eq("myID"))).thenReturn(folder);
     when(toolbox.getConfig()).thenReturn(config);
+    when(toolbox.getJsonMapper()).thenReturn(objectMapper);
 
     TaskActionClient taskActionClient = mock(TaskActionClient.class);
     when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -89,7 +100,9 @@ public class AbstractTaskTest
         String result = super.setup(toolbox);
         File attemptDir = Paths.get(folder.getAbsolutePath(), "attempt", 
toolbox.getAttemptId()).toFile();
         File reportsDir = new File(attemptDir, "report.json");
+        File statusDir = new File(attemptDir, "status.json");
         FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8);
+        FileUtils.write(statusDir, "{}", StandardCharsets.UTF_8);
         return result;
       }
     };
@@ -98,6 +111,7 @@ public class AbstractTaskTest
     // call it 3 times, once to update location in setup, then one for status 
and location in cleanup
     Mockito.verify(taskActionClient, times(3)).submit(any());
     verify(pusher, times(1)).pushTaskReports(eq("myID"), any());
+    verify(pusher, times(1)).pushTaskStatus(eq("myID"), any());
   }
 
   @Test
@@ -117,6 +131,7 @@ public class AbstractTaskTest
     File folder = temporaryFolder.newFolder();
     when(config.getTaskDir(eq("myID"))).thenReturn(folder);
     when(toolbox.getConfig()).thenReturn(config);
+    when(toolbox.getJsonMapper()).thenReturn(objectMapper);
 
     TaskActionClient taskActionClient = mock(TaskActionClient.class);
     when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -161,6 +176,7 @@ public class AbstractTaskTest
     File folder = temporaryFolder.newFolder();
     when(config.getTaskDir(eq("myID"))).thenReturn(folder);
     when(toolbox.getConfig()).thenReturn(config);
+    when(toolbox.getJsonMapper()).thenReturn(objectMapper);
 
     TaskActionClient taskActionClient = mock(TaskActionClient.class);
     when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
@@ -169,7 +185,7 @@ public class AbstractTaskTest
     AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, 
null)
     {
       @Override
-      public TaskStatus runTask(TaskToolbox toolbox) 
+      public TaskStatus runTask(TaskToolbox toolbox)
       {
         return TaskStatus.failure("myId", "failed");
       }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 510d0883d3..a09644ea6f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -244,7 +244,7 @@ public class TaskMonitorTest
     }
 
     @Override
-    public void cleanUp(TaskToolbox toolbox, boolean failure)
+    public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
     {
       // do nothing
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
index 22c13e3464..b88b31443e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
 import org.apache.druid.java.util.common.FileUtils;
@@ -95,6 +96,28 @@ public class FileTaskLogsTest
     );
   }
 
+  @Test
+  public void testSimpleStatus() throws Exception
+  {
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
+    final File tmpDir = temporaryFolder.newFolder();
+    final File logDir = new File(tmpDir, "druid/myTask");
+    final File statusFile = new File(tmpDir, "status.json");
+
+    final String taskId = "myTask";
+    final TaskStatus taskStatus = TaskStatus.success(taskId);
+    final String taskStatusString = mapper.writeValueAsString(taskStatus);
+    Files.write(taskStatusString, statusFile, StandardCharsets.UTF_8);
+
+    final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
+    taskLogs.pushTaskStatus(taskId, statusFile);
+
+    Assert.assertEquals(
+        taskStatusString,
+        
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus(taskId).get()))
+    );
+  }
+
   @Test
   public void testPushTaskLogDirCreationFails() throws Exception
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 7ea2325f12..a185b583c2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -163,7 +163,7 @@ public class SingleTaskBackgroundRunnerTest
       }
 
       @Override
-      public void cleanUp(TaskToolbox toolbox, boolean failure)
+      public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
       {
         // do nothing
       }
@@ -181,10 +181,10 @@ public class SingleTaskBackgroundRunnerTest
 
     final QueryRunner<ScanResultValue> queryRunner =
         Druids.newScanQueryBuilder()
-              .dataSource("foo")
-              .intervals(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
-              .build()
-              .getRunner(runner);
+            .dataSource("foo")
+            .intervals(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+            .build()
+            .getRunner(runner);
 
     Assert.assertThat(queryRunner, 
CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
   }
@@ -262,7 +262,7 @@ public class SingleTaskBackgroundRunnerTest
           }
 
           @Override
-          public void cleanUp(TaskToolbox toolbox, boolean failure)
+          public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
           {
             // do nothing
           }
@@ -384,7 +384,7 @@ public class SingleTaskBackgroundRunnerTest
     }
 
     @Override
-    public void cleanUp(TaskToolbox toolbox, boolean failure)
+    public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
     {
       // do nothing
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 616d2ca8ba..6388cdd573 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -412,10 +412,10 @@ public class TaskQueueTest extends IngestionTestBase
         TaskLocation.create("worker", 1, 2)
     ), workerHolder);
     while (!taskRunner.getRunningTasks()
-                      .stream()
-                      .map(TaskRunnerWorkItem::getTaskId)
-                      .collect(Collectors.toList())
-                      .contains(task.getId())) {
+        .stream()
+        .map(TaskRunnerWorkItem::getTaskId)
+        .collect(Collectors.toList())
+        .contains(task.getId())) {
       Thread.sleep(100);
     }
     taskQueue.shutdown(task.getId(), "shutdown");
@@ -435,7 +435,7 @@ public class TaskQueueTest extends IngestionTestBase
     HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new 
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
     
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
-            .andReturn(druidNodeDiscovery);
+        .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
     TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
     for (String taskId : runningTasks) {
@@ -520,7 +520,7 @@ public class TaskQueueTest extends IngestionTestBase
     }
 
     @Override
-    public void cleanUp(TaskToolbox toolbox, boolean failure)
+    public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
     {
       // do nothing
     }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java 
b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
index 13962d7c4d..287b2f6fcc 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java
@@ -47,6 +47,12 @@ public class NoopTaskLogs implements TaskLogs
     log.info("Not pushing reports for task: %s", taskid);
   }
 
+  @Override
+  public void pushTaskStatus(String taskid, File statusFile)
+  {
+    log.info("Not pushing status for task: %s", taskid);
+  }
+
   @Override
   public void killAll()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java 
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
index 0a06237e7f..a1cb317de3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
@@ -35,4 +35,8 @@ public interface TaskLogPusher
   default void pushTaskReports(String taskid, File reportFile) throws 
IOException
   {
   }
+
+  default void pushTaskStatus(String taskid, File reportFile) throws 
IOException
+  {
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java 
b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
index 04add17ea5..1ec6a94df1 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
@@ -44,4 +44,9 @@ public interface TaskLogStreamer
   {
     return Optional.absent();
   }
+
+  default Optional<InputStream> streamTaskStatus(final String taskid) throws 
IOException
+  {
+    return Optional.absent();
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java 
b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
similarity index 70%
copy from processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
copy to processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
index 0a06237e7f..30192932a9 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java
@@ -19,20 +19,17 @@
 
 package org.apache.druid.tasklogs;
 
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Assert;
+import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 
-/**
- * Something that knows how to persist local task logs to some form of 
long-term storage.
- */
-@ExtensionPoint
-public interface TaskLogPusher
+public class NoopTaskLogsTest
 {
-  void pushTaskLog(String taskid, File logFile) throws IOException;
-
-  default void pushTaskReports(String taskid, File reportFile) throws 
IOException
+  @Test
+  public void test_streamTaskStatus() throws IOException
   {
+    TaskLogs taskLogs = new NoopTaskLogs();
+    Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent());
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java 
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
similarity index 66%
copy from processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
copy to 
processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
index 0a06237e7f..15b67409b3 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java
+++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java
@@ -19,20 +19,27 @@
 
 package org.apache.druid.tasklogs;
 
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 
-/**
- * Something that knows how to persist local task logs to some form of 
long-term storage.
- */
-@ExtensionPoint
-public interface TaskLogPusher
+public class TaskLogPusherTest
 {
-  void pushTaskLog(String taskid, File logFile) throws IOException;
-
-  default void pushTaskReports(String taskid, File reportFile) throws 
IOException
+  /**
+   * Test default implementation of pushTaskStatus in TaskLogPusher interface 
for code coverage
+   *
+   * @throws IOException
+   */
+  @Test
+  public void test_pushTaskStatus() throws IOException
   {
+    TaskLogPusher taskLogPusher = new TaskLogPusher() {
+      @Override
+      public void pushTaskLog(String taskid, File logFile)
+      {
+      }
+    };
+    taskLogPusher.pushTaskStatus("id", new File(""));
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java 
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
similarity index 58%
copy from 
processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
copy to 
processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
index 04add17ea5..e61b9f0a1a 100644
--- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java
+++ 
b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java
@@ -20,28 +20,29 @@
 package org.apache.druid.tasklogs;
 
 import com.google.common.base.Optional;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.junit.Assert;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
 
-/**
- * Something that knows how to stream logs for tasks.
- */
-@ExtensionPoint
-public interface TaskLogStreamer
+public class TaskLogStreamerTest
 {
   /**
-   * Stream log for a task.
+   * Test default implemenation of streamTaskStatus in TaskLogStreamer 
interface for code coverage
    *
-   * @param offset If zero, stream the entire log. If positive, attempt to 
read from this position onwards. If
-   *               negative, attempt to read this many bytes from the end of 
the file (like <tt>tail -n</tt>).
-   * @return inputStream for this log, if available
+   * @throws IOException
    */
-  Optional<InputStream> streamTaskLog(String taskid, long offset) throws 
IOException;
-
-  default Optional<InputStream> streamTaskReports(final String taskid) throws 
IOException
+  @Test
+  public void test_streamTaskStatus() throws IOException
   {
-    return Optional.absent();
+    TaskLogStreamer taskLogStreamer = new TaskLogStreamer() {
+      @Override
+      public Optional<InputStream> streamTaskLog(String taskid, long offset)
+      {
+        return Optional.absent();
+      }
+    };
+    Assert.assertFalse(taskLogStreamer.streamTaskStatus("id").isPresent());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to