This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0de44d91f1b Cleanup serialiazation of TaskReportMap (#16217)
0de44d91f1b is described below

commit 0de44d91f1b8aeded7bcc6c04e3058ae86215bb1
Author: Kashif Faraz <kashif.fa...@gmail.com>
AuthorDate: Tue Apr 2 00:23:24 2024 +0530

    Cleanup serialiazation of TaskReportMap (#16217)
    
    * Build task reports in AbstractBatchIndexTask
    
    * Minor cleanup
    
    * Apply suggestions from code review by @abhishekrb
    
    Co-authored-by: Abhishek Radhakrishnan <abhishek.r...@gmail.com>
    
    * Cleanup IndexTaskTest
    
    * Fix formatting
    
    * Fix coverage
    
    * Cleanup serialization of TaskReport map
    
    * Replace occurrences of Map<String, TaskReport>
    
    * Return TaskReport.ReportMap for live reports, fix test comparisons
    
    * Address test failures
    
    ---------
    
    Co-authored-by: Abhishek Radhakrishnan <abhishek.r...@gmail.com>
---
 .../java/org/apache/druid/msq/exec/Controller.java |   3 +-
 .../apache/druid/msq/exec/ControllerContext.java   |   4 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   2 +-
 .../msq/indexing/IndexerControllerContext.java     |   4 +-
 .../msq/indexing/client/ControllerChatHandler.java |   3 +-
 .../druid/msq/indexing/WorkerChatHandlerTest.java  |   3 +-
 .../indexing/client/ControllerChatHandlerTest.java |  69 ++++++++++++
 .../msq/indexing/report/MSQTaskReportTest.java     |   5 +-
 .../druid/msq/test/MSQTestControllerContext.java   |   6 +-
 .../msq/test/MSQTestOverlordServiceClient.java     |   4 +-
 .../druid/msq/test/MSQTestWorkerContext.java       |   2 +-
 .../druid/indexing/common/KillTaskReport.java      |  19 ++++
 .../common/MultipleFileTaskReportFileWriter.java   |   2 +-
 .../common/SingleFileTaskReportFileWriter.java     |  21 +---
 .../apache/druid/indexing/common/TaskReport.java   |  29 ++++-
 .../indexing/common/TaskReportFileWriter.java      |   4 +-
 .../common/task/AbstractBatchIndexTask.java        |  26 ++++-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   2 +-
 .../druid/indexing/common/task/CompactionTask.java |  10 +-
 .../indexing/common/task/HadoopIndexTask.java      |   2 +-
 .../druid/indexing/common/task/IndexTask.java      |  24 ++--
 .../GeneratedPartitionsMetadataReport.java         |   3 +-
 .../batch/parallel/GeneratedPartitionsReport.java  |   7 +-
 .../parallel/ParallelIndexSupervisorTask.java      |  33 ++----
 .../parallel/PartialHashSegmentGenerateTask.java   |   2 +-
 .../parallel/PartialRangeSegmentGenerateTask.java  |   2 +-
 .../batch/parallel/PartialSegmentGenerateTask.java |   6 +-
 .../batch/parallel/PartialSegmentMergeTask.java    |   6 +-
 .../task/batch/parallel/PushedSegmentsReport.java  |   7 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |  26 ++---
 .../SeekableStreamIndexTaskRunner.java             |   2 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   4 +-
 .../druid/indexing/common/task/IndexTaskTest.java  |   4 +-
 .../indexing/common/task/IngestionTestBase.java    |  10 +-
 .../common/task/KillUnusedSegmentsTaskTest.java    |   2 +-
 .../common/task/NoopTestTaskReportFileWriter.java  |   4 +-
 .../indexing/common/task/TaskReportSerdeTest.java  |  94 ++++++++++------
 .../AbstractMultiPhaseParallelIndexingTest.java    |   7 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   | 125 ++++++++++++---------
 .../MultiPhaseParallelIndexingRowStatsTest.java    |  14 +--
 .../ParallelIndexSupervisorTaskResourceTest.java   |   9 +-
 .../batch/parallel/PushedSegmentsReportTest.java   |  11 +-
 .../parallel/SinglePhaseParallelIndexingTest.java  |  15 ++-
 .../SeekableStreamIndexTaskTestBase.java           |   4 +-
 44 files changed, 387 insertions(+), 254 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
index b7a93de5ed0..1fa23e3d1b7 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
@@ -32,7 +32,6 @@ import 
org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
 
 import javax.annotation.Nullable;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Interface for the controller of a multi-stage query.
@@ -123,6 +122,6 @@ public interface Controller
   List<String> getTaskIds();
 
   @Nullable
-  Map<String, TaskReport> liveReports();
+  TaskReport.ReportMap liveReports();
 
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index 0a32158cf9b..35aa1c79ef0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -28,8 +28,6 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 
-import java.util.Map;
-
 /**
  * Context used by multi-stage query controllers.
  *
@@ -80,5 +78,5 @@ public interface ControllerContext
   /**
    * Writes controller task report.
    */
-  void writeReports(String controllerTaskId, Map<String, TaskReport> reports);
+  void writeReports(String controllerTaskId, TaskReport.ReportMap reports);
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index aeba1be947c..6db998bfef2 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -916,7 +916,7 @@ public class ControllerImpl implements Controller
 
   @Override
   @Nullable
-  public Map<String, TaskReport> liveReports()
+  public TaskReport.ReportMap liveReports()
   {
     final QueryDefinition queryDef = queryDefRef.get();
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 401d6af7072..1426726d592 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -41,8 +41,6 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.DruidNode;
 
-import java.util.Map;
-
 /**
  * Implementation for {@link ControllerContext} required to run multi-stage 
queries as indexing tasks.
  */
@@ -126,7 +124,7 @@ public class IndexerControllerContext implements 
ControllerContext
   }
 
   @Override
-  public void writeReports(String controllerTaskId, Map<String, TaskReport> 
reports)
+  public void writeReports(String controllerTaskId, TaskReport.ReportMap 
reports)
   {
     toolbox.getTaskReportFileWriter().write(controllerTaskId, reports);
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
index 22d3b31cf10..2d979fb8bda 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
@@ -45,7 +45,6 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.List;
-import java.util.Map;
 
 public class ControllerChatHandler implements ChatHandler
 {
@@ -189,7 +188,7 @@ public class ControllerChatHandler implements ChatHandler
   public Response httpGetLiveReports(@Context final HttpServletRequest req)
   {
     ChatHandlers.authorizationCheck(req, Action.WRITE, task.getDataSource(), 
toolbox.getAuthorizerMapper());
-    final Map<String, TaskReport> reports = controller.liveReports();
+    final TaskReport.ReportMap reports = controller.liveReports();
     if (reports == null) {
       return Response.status(Response.Status.NOT_FOUND).build();
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
index c88f22d1b74..70ed5651300 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java
@@ -53,7 +53,6 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.io.InputStream;
 import java.util.HashMap;
-import java.util.Map;
 
 public class WorkerChatHandlerTest
 {
@@ -88,7 +87,7 @@ public class WorkerChatHandlerTest
                          new TaskReportFileWriter()
                          {
                            @Override
-                           public void write(String taskId, Map<String, 
TaskReport> reports)
+                           public void write(String taskId, 
TaskReport.ReportMap reports)
                            {
 
                            }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
new file mode 100644
index 00000000000..81da98d6fa7
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.client;
+
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+
+public class ControllerChatHandlerTest
+{
+  @Test
+  public void testHttpGetLiveReports()
+  {
+    final Controller controller = Mockito.mock(Controller.class);
+
+    TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+    reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new 
KillTaskReport.Stats(1, 2, 3)));
+
+    Mockito.when(controller.liveReports())
+           .thenReturn(reportMap);
+
+    MSQControllerTask task = Mockito.mock(MSQControllerTask.class);
+    Mockito.when(task.getDataSource())
+           .thenReturn("wiki");
+    Mockito.when(controller.task())
+           .thenReturn(task);
+
+    TaskToolbox toolbox = Mockito.mock(TaskToolbox.class);
+    Mockito.when(toolbox.getAuthorizerMapper())
+           .thenReturn(new AuthorizerMapper(null));
+
+    ControllerChatHandler chatHandler = new ControllerChatHandler(toolbox, 
controller);
+
+    HttpServletRequest httpRequest = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(httpRequest.getAttribute(ArgumentMatchers.anyString()))
+           .thenReturn("allow-all");
+    Response response = chatHandler.httpGetLiveReports(httpRequest);
+
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(reportMap, response.getEntity());
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 4bc3d1075c1..df492a70056 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -57,7 +57,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class MSQTaskReportTest
 {
@@ -242,9 +241,9 @@ public class MSQTaskReportTest
     writer.setObjectMapper(mapper);
     writer.write(TASK_ID, TaskReport.buildTaskReports(report));
 
-    final Map<String, TaskReport> reportMap = mapper.readValue(
+    final TaskReport.ReportMap reportMap = mapper.readValue(
         reportFile,
-        new TypeReference<Map<String, TaskReport>>()
+        new TypeReference<TaskReport.ReportMap>()
         {
         }
     );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 5ab8932de3e..e0b0a837de4 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -88,7 +88,7 @@ public class MSQTestControllerContext implements 
ControllerContext
   private final ServiceEmitter emitter = new NoopServiceEmitter();
 
   private Controller controller;
-  private Map<String, TaskReport> report = null;
+  private TaskReport.ReportMap report = null;
   private final WorkerMemoryParameters workerMemoryParameters;
 
   public MSQTestControllerContext(
@@ -273,14 +273,14 @@ public class MSQTestControllerContext implements 
ControllerContext
   }
 
   @Override
-  public void writeReports(String controllerTaskId, Map<String, TaskReport> 
taskReport)
+  public void writeReports(String controllerTaskId, TaskReport.ReportMap 
taskReport)
   {
     if (controller != null && controller.id().equals(controllerTaskId)) {
       report = taskReport;
     }
   }
 
-  public Map<String, TaskReport> getAllReports()
+  public TaskReport.ReportMap getAllReports()
   {
     return report;
   }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index c5f601d875e..db878d70024 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -55,7 +55,7 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
   private final WorkerMemoryParameters workerMemoryParameters;
   private final List<ImmutableSegmentLoadInfo> loadedSegmentMetadata;
   private final Map<String, Controller> inMemoryControllers = new HashMap<>();
-  private final Map<String, Map<String, TaskReport>> reports = new HashMap<>();
+  private final Map<String, TaskReport.ReportMap> reports = new HashMap<>();
   private final Map<String, MSQControllerTask> inMemoryControllerTask = new 
HashMap<>();
   private final Map<String, TaskStatus> inMemoryTaskStatus = new HashMap<>();
 
@@ -171,7 +171,7 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
 
   // hooks to pull stuff out for testing
   @Nullable
-  public Map<String, TaskReport> getReportForTask(String id)
+  public TaskReport.ReportMap getReportForTask(String id)
   {
     return reports.get(id);
   }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index e80f8e2d31f..eb4976701eb 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -126,7 +126,7 @@ public class MSQTestWorkerContext implements WorkerContext
     final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter()
     {
       @Override
-      public void write(String taskId, Map<String, TaskReport> reports)
+      public void write(String taskId, TaskReport.ReportMap reports)
       {
 
       }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
index f97f761166a..f78b3d1d162 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
@@ -62,6 +62,25 @@ public class KillTaskReport implements TaskReport
     return stats;
   }
 
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    KillTaskReport that = (KillTaskReport) o;
+    return Objects.equals(taskId, that.taskId) && Objects.equals(stats, 
that.stats);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(taskId, stats);
+  }
+
   public static class Stats
   {
     private final int numSegmentsKilled;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
index 18313ccd79b..e6bc5e67f3a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
@@ -37,7 +37,7 @@ public class MultipleFileTaskReportFileWriter implements 
TaskReportFileWriter
   private ObjectMapper objectMapper;
 
   @Override
-  public void write(String taskId, Map<String, TaskReport> reports)
+  public void write(String taskId, TaskReport.ReportMap reports)
   {
     final File reportsFile = taskReportFiles.get(taskId);
     if (reportsFile == null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
index 4d55dd64963..79f880d3076 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
@@ -19,17 +19,13 @@
 
 package org.apache.druid.indexing.common;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
-import java.util.Map;
 
 public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
 {
@@ -44,7 +40,7 @@ public class SingleFileTaskReportFileWriter implements 
TaskReportFileWriter
   }
 
   @Override
-  public void write(String taskId, Map<String, TaskReport> reports)
+  public void write(String taskId, TaskReport.ReportMap reports)
   {
     try {
       final File reportsFileParent = reportsFile.getParentFile();
@@ -70,20 +66,9 @@ public class SingleFileTaskReportFileWriter implements 
TaskReportFileWriter
   public static void writeReportToStream(
       final ObjectMapper objectMapper,
       final OutputStream outputStream,
-      final Map<String, TaskReport> reports
+      final TaskReport.ReportMap reports
   ) throws Exception
   {
-    final SerializerProvider serializers = 
objectMapper.getSerializerProviderInstance();
-
-    try (final JsonGenerator jg = 
objectMapper.getFactory().createGenerator(outputStream)) {
-      jg.writeStartObject();
-
-      for (final Map.Entry<String, TaskReport> entry : reports.entrySet()) {
-        jg.writeFieldName(entry.getKey());
-        JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, 
entry.getValue());
-      }
-
-      jg.writeEndObject();
-    }
+    objectMapper.writeValue(outputStream, reports);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 2ac260d72fd..86ed562112f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -21,9 +21,9 @@ package org.apache.druid.indexing.common;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.base.Optional;
 
 import java.util.LinkedHashMap;
-import java.util.Map;
 
 /**
  * TaskReport objects contain additional information about an indexing task, 
such as row statistics, errors, and
@@ -31,7 +31,10 @@ import java.util.Map;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = 
IngestionStatsAndErrorsTaskReport.class),
+    @JsonSubTypes.Type(
+        name = IngestionStatsAndErrorsTaskReport.REPORT_KEY,
+        value = IngestionStatsAndErrorsTaskReport.class
+    ),
     @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = 
KillTaskReport.class)
 })
 public interface TaskReport
@@ -48,13 +51,29 @@ public interface TaskReport
   /**
    * Returns an order-preserving map that is suitable for passing into {@link 
TaskReportFileWriter#write}.
    */
-  static Map<String, TaskReport> buildTaskReports(TaskReport... taskReports)
+  static ReportMap buildTaskReports(TaskReport... taskReports)
   {
-    // Use LinkedHashMap to preserve order of the reports.
-    Map<String, TaskReport> taskReportMap = new LinkedHashMap<>();
+    ReportMap taskReportMap = new ReportMap();
     for (TaskReport taskReport : taskReports) {
       taskReportMap.put(taskReport.getReportKey(), taskReport);
     }
     return taskReportMap;
   }
+
+  /**
+   * Represents an ordered map from report key to a TaskReport that is 
compatible
+   * for writing out reports to files or serving over HTTP.
+   * <p>
+   * This class is needed for Jackson serde to work correctly. Without this 
class,
+   * a TaskReport is serialized without the type information and cannot be
+   * deserialized back into a concrete implementation.
+   */
+  class ReportMap extends LinkedHashMap<String, TaskReport>
+  {
+    @SuppressWarnings("unchecked")
+    public <T extends TaskReport> Optional<T> findReport(String reportKey)
+    {
+      return Optional.fromNullable((T) get(reportKey));
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
index 908efe6e1a4..972f0b010f3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
@@ -21,11 +21,9 @@ package org.apache.druid.indexing.common;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import java.util.Map;
-
 public interface TaskReportFileWriter
 {
-  void write(String taskId, Map<String, TaskReport> reports);
+  void write(String taskId, TaskReport.ReportMap reports);
 
   void setObjectMapper(ObjectMapper objectMapper);
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index a5acadf704e..4a093cde2ce 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -903,11 +903,35 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     return null;
   }
 
+  protected TaskReport.ReportMap buildLiveIngestionStatsReport(
+      IngestionState ingestionState,
+      Map<String, Object> unparseableEvents,
+      Map<String, Object> rowStats
+  )
+  {
+    return TaskReport.buildTaskReports(
+        new IngestionStatsAndErrorsTaskReport(
+            getId(),
+            new IngestionStatsAndErrors(
+                ingestionState,
+                unparseableEvents,
+                rowStats,
+                null,
+                false,
+                0L,
+                null,
+                null,
+                null
+            )
+        )
+    );
+  }
+
   /**
    * Builds a singleton map with {@link 
IngestionStatsAndErrorsTaskReport#REPORT_KEY}
    * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as 
value.
    */
-  protected Map<String, TaskReport> buildIngestionStatsReport(
+  protected TaskReport.ReportMap buildIngestionStatsReport(
       IngestionState ingestionState,
       String errorMessage,
       Long segmentsRead,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c00f219c777..2e4710d43cd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -609,7 +609,7 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
    *
    * @return Map of reports for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private TaskReport.ReportMap getTaskCompletionReports()
   {
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 59a0a499f91..eabb9e06236 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -502,7 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       log.info("Generated [%d] compaction task specs", totalNumSpecs);
 
       int failCnt = 0;
-      Map<String, TaskReport> completionReports = new HashMap<>();
+      final TaskReport.ReportMap completionReports = new 
TaskReport.ReportMap();
       for (int i = 0; i < indexTaskSpecs.size(); i++) {
         ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
         final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
@@ -521,9 +521,11 @@ public class CompactionTask extends AbstractBatchIndexTask
             }
 
             String reportKeySuffix = "_" + i;
-            Optional.ofNullable(eachSpec.getCompletionReports())
-                    .ifPresent(reports -> completionReports.putAll(
-                        CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)));
+            Optional.ofNullable(eachSpec.getCompletionReports()).ifPresent(
+                reports -> completionReports.putAll(
+                    CollectionUtils.mapKeys(reports, key -> key + 
reportKeySuffix)
+                )
+            );
           } else {
             failCnt++;
             log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 5fd3572f9e2..31a4b9c6665 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -680,7 +680,7 @@ public class HadoopIndexTask extends HadoopTask implements 
ChatHandler
     return Response.ok(returnMap).build();
   }
 
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private TaskReport.ReportMap getTaskCompletionReports()
   {
     return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5c538d4a0fc..532529ecfc1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -187,7 +187,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @Nullable
   private String errorMsg;
 
-  private Map<String, TaskReport> completionReports;
+  private TaskReport.ReportMap completionReports;
 
   @JsonCreator
   public IndexTask(
@@ -320,7 +320,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
   @Nullable
   @JsonIgnore
-  public Map<String, TaskReport> getCompletionReports()
+  public TaskReport.ReportMap getCompletionReports()
   {
     return completionReports;
   }
@@ -415,21 +415,13 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    Map<String, Object> returnMap = new HashMap<>();
-    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
-    Map<String, Object> payload = new HashMap<>();
-    Map<String, Object> events = getTaskCompletionUnparseableEvents();
-
-    payload.put("ingestionState", ingestionState);
-    payload.put("unparseableEvents", events);
-    payload.put("rowStats", doGetRowStats(full != null));
 
-    ingestionStatsAndErrors.put("taskId", getId());
-    ingestionStatsAndErrors.put("payload", payload);
-    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
-    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
-    return Response.ok(returnMap).build();
+    final TaskReport.ReportMap liveReports = buildLiveIngestionStatsReport(
+        ingestionState,
+        getTaskCompletionUnparseableEvents(),
+        doGetRowStats(full != null)
+    );
+    return Response.ok(liveReports).build();
   }
 
   @JsonProperty("spec")
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
index a83c3cc7412..7c98b3ee579 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.common.TaskReport;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Report containing the {@link GenericPartitionStat}s created by a {@link 
PartialSegmentGenerateTask}. This report is
@@ -38,7 +37,7 @@ class GeneratedPartitionsMetadataReport extends 
GeneratedPartitionsReport
   GeneratedPartitionsMetadataReport(
       @JsonProperty("taskId") String taskId,
       @JsonProperty("partitionStats") List<PartitionStat> partitionStats,
-      @JsonProperty("taskReport") Map<String, TaskReport> taskReport
+      @JsonProperty("taskReport") TaskReport.ReportMap taskReport
   )
   {
     super(taskId, partitionStats, taskReport);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
index 1fa025d1c91..8fedae5a4f1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.common.TaskReport;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -35,9 +34,9 @@ public class GeneratedPartitionsReport implements 
SubTaskReport
 {
   private final String taskId;
   private final List<PartitionStat> partitionStats;
-  private final Map<String, TaskReport> taskReport;
+  private final TaskReport.ReportMap taskReport;
 
-  GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, 
Map<String, TaskReport> taskReport)
+  GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, 
TaskReport.ReportMap taskReport)
   {
     this.taskId = taskId;
     this.partitionStats = partitionStats;
@@ -52,7 +51,7 @@ public class GeneratedPartitionsReport implements 
SubTaskReport
   }
 
   @JsonProperty
-  public Map<String, TaskReport> getTaskReport()
+  public TaskReport.ReportMap getTaskReport()
   {
     return taskReport;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4d5b9717ca9..27694360ed9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -202,7 +202,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   private volatile Pair<Map<String, Object>, Map<String, Object>> 
indexGenerateRowStats;
 
   private IngestionState ingestionState;
-  private Map<String, TaskReport> completionReports;
+  private TaskReport.ReportMap completionReports;
   private Long segmentsRead;
   private Long segmentsPublished;
   private final boolean isCompactionTask;
@@ -300,7 +300,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   @Nullable
   @JsonIgnore
-  public Map<String, TaskReport> getCompletionReports()
+  public TaskReport.ReportMap getCompletionReports()
   {
     return completionReports;
   }
@@ -1238,7 +1238,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports(TaskStatus 
taskStatus)
+  private TaskReport.ReportMap getTaskCompletionReports(TaskStatus taskStatus)
   {
     return buildIngestionStatsReport(
         IngestionState.COMPLETED,
@@ -1602,7 +1602,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     // Get stats from completed tasks
     Map<String, PushedSegmentsReport> completedSubtaskReports = 
parallelSinglePhaseRunner.getReports();
     for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = 
pushedSegmentsReport.getTaskReport();
+      TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
       if (taskReport == null || taskReport.isEmpty()) {
         LOG.warn("Received an empty report from subtask[%s]" + 
pushedSegmentsReport.getTaskId());
         continue;
@@ -1642,7 +1642,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
       long totalSegmentsRead = 0L;
       for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
+        TaskReport.ReportMap taskReport = 
generatedPartitionsReport.getTaskReport();
         if (taskReport == null || taskReport.isEmpty()) {
           LOG.warn("Received an empty report from subtask[%s]", 
generatedPartitionsReport.getTaskId());
           continue;
@@ -1726,7 +1726,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   }
 
   private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
+      TaskReport.ReportMap taskReport,
       List<ParseExceptionReport> unparseableEvents
   )
   {
@@ -1804,12 +1804,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   }
 
   @VisibleForTesting
-  public Map<String, Object> doGetLiveReports(boolean isFullReport)
+  public TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
   {
-    Map<String, Object> returnMap = new HashMap<>();
-    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
-    Map<String, Object> payload = new HashMap<>();
-
     Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents 
=
         doGetRowStatsAndUnparseableEvents(isFullReport, true);
 
@@ -1824,16 +1820,11 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
                                 : currentSequentialTask.getIngestionState();
     }
 
-    payload.put("ingestionState", ingestionStateForReport);
-    payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
-    payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);
-
-    ingestionStatsAndErrors.put("taskId", getId());
-    ingestionStatsAndErrors.put("payload", payload);
-    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
-    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
-    return returnMap;
+    return buildLiveIngestionStatsReport(
+        ingestionStateForReport,
+        rowStatsAndUnparsebleEvents.rhs,
+        rowStatsAndUnparsebleEvents.lhs
+    );
   }
 
   @GET
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 49e3591ff18..b6d382472b3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -177,7 +177,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
   }
 
   @Override
-  GeneratedPartitionsMetadataReport 
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> 
segments, Map<String, TaskReport> taskReport)
+  GeneratedPartitionsMetadataReport 
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> 
segments, TaskReport.ReportMap taskReport)
   {
     List<PartitionStat> partitionStats = segments.stream()
                                                         .map(segment -> 
toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 27604eb7e77..9f36ed63541 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -192,7 +192,7 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
   }
 
   @Override
-  GeneratedPartitionsMetadataReport 
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> 
segments, Map<String, TaskReport> taskReport)
+  GeneratedPartitionsMetadataReport 
createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> 
segments, TaskReport.ReportMap taskReport)
   {
     List<PartitionStat> partitionStats = segments.stream()
                                                         .map(segment -> 
toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index bbeb00aa845..0fd6b0916b8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -125,7 +125,7 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
         toolbox.getIndexingTmpDir()
     );
 
-    Map<String, TaskReport> taskReport = 
getTaskCompletionReports(getNumSegmentsRead(inputSource));
+    TaskReport.ReportMap taskReport = 
getTaskCompletionReports(getNumSegmentsRead(inputSource));
 
     taskClient.report(createGeneratedPartitionsReport(toolbox, segments, 
taskReport));
 
@@ -146,7 +146,7 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
   abstract T createGeneratedPartitionsReport(
       TaskToolbox toolbox,
       List<DataSegment> segments,
-      Map<String, TaskReport> taskReport
+      TaskReport.ReportMap taskReport
   );
 
   private Long getNumSegmentsRead(InputSource inputSource)
@@ -249,7 +249,7 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
+  private TaskReport.ReportMap getTaskCompletionReports(Long segmentsRead)
   {
     return buildIngestionStatsReport(
         IngestionState.COMPLETED,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index b3467bb6f20..d4c13ca3370 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -22,12 +22,12 @@ package 
org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.SurrogateAction;
@@ -189,7 +189,9 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
         intervalToUnzippedFiles
     );
 
-    taskClient.report(new PushedSegmentsReport(getId(), 
Collections.emptySet(), pushedSegments, ImmutableMap.of()));
+    taskClient.report(
+        new PushedSegmentsReport(getId(), Collections.emptySet(), 
pushedSegments, new TaskReport.ReportMap())
+    );
 
     return TaskStatus.success(getId());
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
index 8ed373c7d95..9fad997ef78 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.timeline.DataSegment;
 
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -41,14 +40,14 @@ public class PushedSegmentsReport implements SubTaskReport
   private final String taskId;
   private final Set<DataSegment> oldSegments;
   private final Set<DataSegment> newSegments;
-  private final Map<String, TaskReport> taskReport;
+  private final TaskReport.ReportMap taskReport;
 
   @JsonCreator
   public PushedSegmentsReport(
       @JsonProperty("taskId") String taskId,
       @JsonProperty("oldSegments") Set<DataSegment> oldSegments,
       @JsonProperty("segments") Set<DataSegment> newSegments,
-      @JsonProperty("taskReport") Map<String, TaskReport> taskReport
+      @JsonProperty("taskReport") TaskReport.ReportMap taskReport
   )
   {
     this.taskId = Preconditions.checkNotNull(taskId, "taskId");
@@ -77,7 +76,7 @@ public class PushedSegmentsReport implements SubTaskReport
   }
 
   @JsonProperty("taskReport")
-  public Map<String, TaskReport> getTaskReport()
+  public TaskReport.ReportMap getTaskReport()
   {
     return taskReport;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 465750bad03..6af09461c40 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -282,7 +282,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
                                                          
.transform(PartitionChunk::getObject)
                                                          .toSet();
 
-      Map<String, TaskReport> taskReport = getTaskCompletionReports();
+      TaskReport.ReportMap taskReport = getTaskCompletionReports();
       taskClient.report(new PushedSegmentsReport(getId(), oldSegments, 
pushedSegments, taskReport));
 
       toolbox.getTaskReportFileWriter().write(getId(), taskReport);
@@ -542,23 +542,13 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     return Response.ok(doGetRowStats(full != null)).build();
   }
 
-  private Map<String, Object> doGetLiveReports(boolean isFullReport)
+  private TaskReport.ReportMap doGetLiveReports(boolean isFullReport)
   {
-    Map<String, Object> returnMap = new HashMap<>();
-    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
-    Map<String, Object> payload = new HashMap<>();
-    Map<String, Object> events = getTaskCompletionUnparseableEvents();
-
-    payload.put("ingestionState", ingestionState);
-    payload.put("unparseableEvents", events);
-    payload.put("rowStats", doGetRowStats(isFullReport));
-
-    ingestionStatsAndErrors.put("taskId", getId());
-    ingestionStatsAndErrors.put("payload", payload);
-    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
-    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
-    return returnMap;
+    return buildLiveIngestionStatsReport(
+        ingestionState,
+        getTaskCompletionUnparseableEvents(),
+        doGetRowStats(isFullReport)
+    );
   }
 
   @GET
@@ -585,7 +575,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private TaskReport.ReportMap getTaskCompletionReports()
   {
     return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, 
null);
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f3e1e4a06d2..e5a1958a1fa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1118,7 +1118,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
    * @param handoffWaitMs Milliseconds waited for segments to be handed off.
    * @return Map of reports for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports(@Nullable String 
errorMsg, long handoffWaitMs)
+  private TaskReport.ReportMap getTaskCompletionReports(@Nullable String 
errorMsg, long handoffWaitMs)
   {
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index e91d25c0b03..fe6c39b17ed 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1678,9 +1678,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
 
   private IngestionStatsAndErrors getTaskReportData() throws IOException
   {
-    Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
+    TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
         reportsFile,
-        new TypeReference<Map<String, TaskReport>>()
+        new TypeReference<TaskReport.ReportMap>()
         {
         }
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 2619a23ba33..4e5dadf4dbf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -2548,9 +2548,9 @@ public class IndexTaskTest extends IngestionTestBase
 
   private IngestionStatsAndErrors getTaskReportData() throws IOException
   {
-    Map<String, TaskReport> taskReports = jsonMapper.readValue(
+    TaskReport.ReportMap taskReports = jsonMapper.readValue(
         taskRunner.getTaskReportsFile(),
-        new TypeReference<Map<String, TaskReport>>()
+        new TypeReference<TaskReport.ReportMap>()
         {
         }
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 2fe8790fb9c..f0f7e329992 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.common.task;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
@@ -270,9 +269,6 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
   /**
    * Converts ParseSpec to InputFormat for indexing tests. To be used until 
{@link FirehoseFactory}
    * & {@link InputRowParser} is deprecated and removed.
-   *
-   * @param parseSpec
-   * @return
    */
   public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
   {
@@ -510,11 +506,9 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
     }
   }
 
-  public Map<String, TaskReport> getReports() throws IOException
+  public TaskReport.ReportMap getReports() throws IOException
   {
-    return objectMapper.readValue(reportsFile, new TypeReference<Map<String, 
TaskReport>>()
-    {
-    });
+    return objectMapper.readValue(reportsFile, TaskReport.ReportMap.class);
   }
 
   public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index 20de427508f..488f8ce98dd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -1134,7 +1134,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     try {
       Object payload = getObjectMapper().readValue(
           taskRunner.getTaskReportsFile(),
-          new TypeReference<Map<String, TaskReport>>()
+          new TypeReference<TaskReport.ReportMap>()
           {
           }
       ).get(KillTaskReport.REPORT_KEY).getPayload();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
index 0398a219516..dddbe1bd7c3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
@@ -23,12 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskReportFileWriter;
 
-import java.util.Map;
-
 public class NoopTestTaskReportFileWriter implements TaskReportFileWriter
 {
   @Override
-  public void write(String id, Map<String, TaskReport> reports)
+  public void write(String id, TaskReport.ReportMap reports)
   {
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index aece6edb3f4..b0fced552b3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -28,6 +28,7 @@ import com.google.common.io.Files;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.KillTaskReport;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TestUtils;
@@ -38,7 +39,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
+import java.util.Collections;
 
 public class TaskReportSerdeTest
 {
@@ -55,47 +56,56 @@ public class TaskReportSerdeTest
   }
 
   @Test
-  public void testSerde() throws Exception
+  public void testSerdeOfIngestionReport() throws Exception
   {
-    IngestionStatsAndErrorsTaskReport report1 = new 
IngestionStatsAndErrorsTaskReport(
-        "testID",
-        new IngestionStatsAndErrors(
-            IngestionState.BUILD_SEGMENTS,
-            ImmutableMap.of(
-                "hello", "world"
-            ),
-            ImmutableMap.of(
-                "number", 1234
-            ),
-            "an error message",
-            true,
-            1000L,
-            ImmutableMap.of("PartitionA", 5000L),
-            5L,
-            10L
-        )
-    );
-    String report1serialized = jsonMapper.writeValueAsString(report1);
-    IngestionStatsAndErrorsTaskReport report2 = 
(IngestionStatsAndErrorsTaskReport) jsonMapper.readValue(
-        report1serialized,
-        TaskReport.class
-    );
-    Assert.assertEquals(report1, report2);
-    Assert.assertEquals(report1.hashCode(), report2.hashCode());
+    IngestionStatsAndErrorsTaskReport originalReport = 
buildTestIngestionReport();
+    String reportJson = jsonMapper.writeValueAsString(originalReport);
+    TaskReport deserialized = jsonMapper.readValue(reportJson, 
TaskReport.class);
+
+    Assert.assertTrue(deserialized instanceof 
IngestionStatsAndErrorsTaskReport);
+
+    IngestionStatsAndErrorsTaskReport deserializedReport = 
(IngestionStatsAndErrorsTaskReport) deserialized;
+    Assert.assertEquals(originalReport, deserializedReport);
+  }
+
+  @Test
+  public void testSerdeOfKillTaskReport() throws Exception
+  {
+    KillTaskReport originalReport = new KillTaskReport("taskId", new 
KillTaskReport.Stats(1, 2, 3));
+    String reportJson = jsonMapper.writeValueAsString(originalReport);
+    TaskReport deserialized = jsonMapper.readValue(reportJson, 
TaskReport.class);
+
+    Assert.assertTrue(deserialized instanceof KillTaskReport);
 
+    KillTaskReport deserializedReport = (KillTaskReport) deserialized;
+    Assert.assertEquals(originalReport, deserializedReport);
+  }
+
+  @Test
+  public void testWriteReportMapToFileAndRead() throws Exception
+  {
+    IngestionStatsAndErrorsTaskReport report1 = buildTestIngestionReport();
     final File reportFile = temporaryFolder.newFile();
     final SingleFileTaskReportFileWriter writer = new 
SingleFileTaskReportFileWriter(reportFile);
     writer.setObjectMapper(jsonMapper);
-    Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1);
+    TaskReport.ReportMap reportMap1 = TaskReport.buildTaskReports(report1);
     writer.write("testID", reportMap1);
 
-    Map<String, TaskReport> reportMap2 = jsonMapper.readValue(
-        reportFile,
-        new TypeReference<Map<String, TaskReport>>() {}
-    );
+    TaskReport.ReportMap reportMap2 = jsonMapper.readValue(reportFile, 
TaskReport.ReportMap.class);
     Assert.assertEquals(reportMap1, reportMap2);
   }
 
+  @Test
+  public void testWriteReportMapToStringAndRead() throws Exception
+  {
+    IngestionStatsAndErrorsTaskReport ingestionReport = 
buildTestIngestionReport();
+    TaskReport.ReportMap reportMap = 
TaskReport.buildTaskReports(ingestionReport);
+    String json = jsonMapper.writeValueAsString(reportMap);
+
+    TaskReport.ReportMap deserializedReportMap = jsonMapper.readValue(json, 
TaskReport.ReportMap.class);
+    Assert.assertEquals(reportMap, deserializedReportMap);
+  }
+
   @Test
   public void testSerializationOnMissingPartitionStats() throws Exception
   {
@@ -150,7 +160,7 @@ public class TaskReportSerdeTest
     final File reportFile = temporaryFolder.newFile();
     final SingleFileTaskReportFileWriter writer = new 
SingleFileTaskReportFileWriter(reportFile);
     writer.setObjectMapper(jsonMapper);
-    writer.write("theTask", ImmutableMap.of("report", new 
ExceptionalTaskReport()));
+    writer.write("theTask", TaskReport.buildTaskReports(new 
ExceptionalTaskReport()));
 
     // Read the file, ensure it's incomplete and not valid JSON. This allows 
callers to determine the report was
     // not complete when written.
@@ -160,6 +170,24 @@ public class TaskReportSerdeTest
     );
   }
 
+  private IngestionStatsAndErrorsTaskReport buildTestIngestionReport()
+  {
+    return new IngestionStatsAndErrorsTaskReport(
+        "testID",
+        new IngestionStatsAndErrors(
+            IngestionState.BUILD_SEGMENTS,
+            Collections.singletonMap("hello", "world"),
+            Collections.singletonMap("number", 1234),
+            "an error message",
+            true,
+            1000L,
+            Collections.singletonMap("PartitionA", 5000L),
+            5L,
+            10L
+        )
+    );
+  }
+
   /**
    * Task report that throws an exception while being serialized.
    */
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index db971e1ed86..cb6aa39d5c6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.google.common.base.Preconditions;
-import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LocalInputSource;
@@ -31,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.input.DruidInputSource;
@@ -66,7 +66,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 @SuppressWarnings("SameParameterValue")
@@ -183,10 +182,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
     return getIndexingServiceClient().getPublishedSegments(task);
   }
 
-  Map<String, Object> runTaskAndGetReports(Task task, TaskState 
expectedTaskStatus)
+  TaskReport.ReportMap runTaskAndGetReports(Task task, TaskState 
expectedTaskStatus)
   {
     runTaskAndVerifyStatus(task, expectedTaskStatus);
-    return 
FutureUtils.getUnchecked(getIndexingServiceClient().taskReportAsMap(task.getId()),
 true);
+    return getIndexingServiceClient().getLiveReportsForTask(task.getId());
   }
 
   protected ParallelIndexSupervisorTask createTask(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index a6a9f6ccdd7..84fac9b82d8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -49,10 +49,13 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -118,7 +121,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -546,12 +548,17 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
 
     @Override
     public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
+    {
+      return Futures.immediateFuture(null);
+    }
+
+    public TaskReport.ReportMap getLiveReportsForTask(String taskId)
     {
       final Optional<Task> task = getTaskStorage().getTask(taskId);
       if (!task.isPresent()) {
         return null;
       }
-      return Futures.immediateFuture(((ParallelIndexSupervisorTask) 
task.get()).doGetLiveReports(true));
+      return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports(true);
     }
 
     public TaskContainer getTaskContainer(String taskId)
@@ -773,20 +780,16 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
     }
   }
 
-  protected Map<String, Object> buildExpectedTaskReportSequential(
+  protected TaskReport.ReportMap buildExpectedTaskReportSequential(
       String taskId,
       List<ParseExceptionReport> expectedUnparseableEvents,
       RowIngestionMetersTotals expectedDeterminePartitions,
       RowIngestionMetersTotals expectedTotals
   )
   {
-    final Map<String, Object> payload = new HashMap<>();
+    final Map<String, Object> unparseableEvents =
+        ImmutableMap.of("determinePartitions", ImmutableList.of(), 
"buildSegments", expectedUnparseableEvents);
 
-    payload.put("ingestionState", IngestionState.COMPLETED);
-    payload.put(
-        "unparseableEvents",
-        ImmutableMap.of("determinePartitions", ImmutableList.of(), 
"buildSegments", expectedUnparseableEvents)
-    );
     Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
         "processed", 0.0,
         "processedBytes", 0.0,
@@ -801,72 +804,90 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
         "15m", emptyAverageMinuteMap
     );
 
-    payload.put(
-        "rowStats",
-        ImmutableMap.of(
-            "movingAverages",
-            ImmutableMap.of("determinePartitions", emptyAverages, 
"buildSegments", emptyAverages),
-            "totals",
-            ImmutableMap.of("determinePartitions", 
expectedDeterminePartitions, "buildSegments", expectedTotals)
-        )
+    final Map<String, Object> rowStats = ImmutableMap.of(
+        "movingAverages",
+        ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", 
emptyAverages),
+        "totals",
+        ImmutableMap.of("determinePartitions", expectedDeterminePartitions, 
"buildSegments", expectedTotals)
     );
 
-    final Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
-    ingestionStatsAndErrors.put("taskId", taskId);
-    ingestionStatsAndErrors.put("payload", payload);
-    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
-    return Collections.singletonMap("ingestionStatsAndErrors", 
ingestionStatsAndErrors);
+    return TaskReport.buildTaskReports(
+        new IngestionStatsAndErrorsTaskReport(
+            taskId,
+            new IngestionStatsAndErrors(
+                IngestionState.COMPLETED,
+                unparseableEvents,
+                rowStats,
+                null,
+                false,
+                0L,
+                null,
+                null,
+                null
+            )
+        )
+    );
   }
 
-  protected Map<String, Object> buildExpectedTaskReportParallel(
+  protected TaskReport.ReportMap buildExpectedTaskReportParallel(
       String taskId,
       List<ParseExceptionReport> expectedUnparseableEvents,
       RowIngestionMetersTotals expectedTotals
   )
   {
-    Map<String, Object> returnMap = new HashMap<>();
-    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
-    Map<String, Object> payload = new HashMap<>();
-
-    payload.put("ingestionState", IngestionState.COMPLETED);
-    payload.put("unparseableEvents", ImmutableMap.of("buildSegments", 
expectedUnparseableEvents));
-    payload.put("rowStats", ImmutableMap.of("totals", 
ImmutableMap.of("buildSegments", expectedTotals)));
+    Map<String, Object> unparseableEvents = ImmutableMap.of("buildSegments", 
expectedUnparseableEvents);
+    Map<String, Object> rowStats = ImmutableMap.of("totals", 
ImmutableMap.of("buildSegments", expectedTotals));
 
-    ingestionStatsAndErrors.put("taskId", taskId);
-    ingestionStatsAndErrors.put("payload", payload);
-    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
-
-    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
-    return returnMap;
+    return TaskReport.buildTaskReports(
+        new IngestionStatsAndErrorsTaskReport(
+            taskId,
+            new IngestionStatsAndErrors(
+                IngestionState.COMPLETED,
+                unparseableEvents,
+                rowStats,
+                null,
+                false,
+                0L,
+                null,
+                null,
+                null
+            )
+        )
+    );
   }
 
   protected void compareTaskReports(
-      Map<String, Object> expectedReports,
-      Map<String, Object> actualReports
+      TaskReport.ReportMap expectedReports,
+      TaskReport.ReportMap actualReports
   )
   {
-    expectedReports = (Map<String, Object>) 
expectedReports.get("ingestionStatsAndErrors");
-    actualReports = (Map<String, Object>) 
actualReports.get("ingestionStatsAndErrors");
+    final Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
+        = expectedReports.findReport("ingestionStatsAndErrors");
+    final Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
+        = actualReports.findReport("ingestionStatsAndErrors");
+
+    Assert.assertTrue(expectedReportOptional.isPresent());
+    Assert.assertTrue(actualReportOptional.isPresent());
+
+    final IngestionStatsAndErrorsTaskReport expectedReport = 
expectedReportOptional.get();
+    final IngestionStatsAndErrorsTaskReport actualReport = 
actualReportOptional.get();
 
-    Assert.assertEquals(expectedReports.get("taskId"), 
actualReports.get("taskId"));
-    Assert.assertEquals(expectedReports.get("type"), 
actualReports.get("type"));
+    Assert.assertEquals(expectedReport.getTaskId(), actualReport.getTaskId());
+    Assert.assertEquals(expectedReport.getReportKey(), 
actualReport.getReportKey());
 
-    Map<String, Object> expectedPayload = (Map<String, Object>) 
expectedReports.get("payload");
-    Map<String, Object> actualPayload = (Map<String, Object>) 
actualReports.get("payload");
-    Assert.assertEquals(expectedPayload.get("ingestionState"), 
actualPayload.get("ingestionState"));
+    final IngestionStatsAndErrors expectedPayload = 
expectedReport.getPayload();
+    final IngestionStatsAndErrors actualPayload = actualReport.getPayload();
+    Assert.assertEquals(expectedPayload.getIngestionState(), 
actualPayload.getIngestionState());
 
-    Map<String, Object> expectedTotals = (Map<String, Object>) 
expectedPayload.get("totals");
-    Map<String, Object> actualTotals = (Map<String, Object>) 
actualReports.get("totals");
+    Map<String, Object> expectedTotals = expectedPayload.getRowStats();
+    Map<String, Object> actualTotals = actualPayload.getRowStats();
     Assert.assertEquals(expectedTotals, actualTotals);
 
     List<ParseExceptionReport> expectedParseExceptionReports =
-        (List<ParseExceptionReport>) ((Map<String, Object>)
-            expectedPayload.get("unparseableEvents")).get("buildSegments");
+        (List<ParseExceptionReport>) 
(expectedPayload.getUnparseableEvents()).get("buildSegments");
 
     List<ParseExceptionReport> actualParseExceptionReports =
-        (List<ParseExceptionReport>) ((Map<String, Object>)
-            actualPayload.get("unparseableEvents")).get("buildSegments");
+        (List<ParseExceptionReport>) 
(actualPayload.getUnparseableEvents()).get("buildSegments");
 
     List<String> expectedMessages = expectedParseExceptionReports
         .stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
index 0cf7fdd147b..b020c360ba8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingRowStatsTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@@ -43,7 +44,6 @@ import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Arrays;
-import java.util.Map;
 
 public class MultiPhaseParallelIndexingRowStatsTest extends 
AbstractMultiPhaseParallelIndexingTest
 {
@@ -133,8 +133,8 @@ public class MultiPhaseParallelIndexingRowStatsTest extends 
AbstractMultiPhasePa
         false
     );
 
-    final RowIngestionMetersTotals expectedTotals = 
RowMeters.with().totalProcessed(200);
-    final Map<String, Object> expectedReports =
+    final RowIngestionMetersTotals expectedTotals = 
RowMeters.with().bytes(5630).totalProcessed(200);
+    final TaskReport.ReportMap expectedReports =
         maxNumConcurrentSubTasks <= 1
         ? buildExpectedTaskReportSequential(
             task.getId(),
@@ -148,7 +148,7 @@ public class MultiPhaseParallelIndexingRowStatsTest extends 
AbstractMultiPhasePa
             expectedTotals
         );
 
-    Map<String, Object> actualReports = runTaskAndGetReports(task, 
TaskState.SUCCESS);
+    TaskReport.ReportMap actualReports = runTaskAndGetReports(task, 
TaskState.SUCCESS);
     compareTaskReports(expectedReports, actualReports);
   }
 
@@ -169,12 +169,12 @@ public class MultiPhaseParallelIndexingRowStatsTest 
extends AbstractMultiPhasePa
         false,
         false
     );
-    Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
+    TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
         task.getId(),
         ImmutableList.of(),
-        new RowIngestionMetersTotals(200, 0, 0, 0, 0)
+        new RowIngestionMetersTotals(200, 5630, 0, 0, 0)
     );
-    Map<String, Object> actualReports = runTaskAndGetReports(task, 
TaskState.SUCCESS);
+    TaskReport.ReportMap actualReports = runTaskAndGetReports(task, 
TaskState.SUCCESS);
     compareTaskReports(expectedReports, actualReports);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 266604a8542..6a4e5f90c64 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.AbstractInputSource;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSplit;
@@ -35,6 +34,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -713,7 +713,12 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
       );
 
       taskClient.report(
-          new PushedSegmentsReport(getId(), Collections.emptySet(), 
Collections.singleton(segment), ImmutableMap.of())
+          new PushedSegmentsReport(
+              getId(),
+              Collections.emptySet(),
+              Collections.singleton(segment),
+              new TaskReport.ReportMap()
+          )
       );
       return TaskStatus.fromCode(getId(), state);
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
index 05d0a1fe9de..3edce650688 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
@@ -20,6 +20,8 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
 import org.junit.Test;
 
 public class PushedSegmentsReportTest
@@ -27,6 +29,13 @@ public class PushedSegmentsReportTest
   @Test
   public void testEquals()
   {
-    
EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify();
+    TaskReport.ReportMap map1 = new TaskReport.ReportMap();
+    TaskReport.ReportMap map2 = new TaskReport.ReportMap();
+    map2.put("killTaskReport", new KillTaskReport("taskId", new 
KillTaskReport.Stats(1, 2, 3)));
+
+    EqualsVerifier.forClass(PushedSegmentsReport.class)
+                  .usingGetClass()
+                  .withPrefabValues(TaskReport.ReportMap.class, map1, map2)
+                  .verify();
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7ad9f3c9464..bc2e38f798e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.Tasks;
@@ -446,9 +447,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         false,
         Collections.emptyList()
     );
-    Map<String, Object> actualReports = task.doGetLiveReports(true);
-    final long processedBytes = useInputFormatApi ? 335 : 0;
-    Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
+    TaskReport.ReportMap actualReports = task.doGetLiveReports(true);
+    TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel(
         task.getId(),
         ImmutableList.of(
             new ParseExceptionReport(
@@ -464,7 +464,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
                 1L
             )
         ),
-        new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1)
+        new RowIngestionMetersTotals(10, 335, 1, 1, 1)
     );
     compareTaskReports(expectedReports, actualReports);
   }
@@ -497,10 +497,9 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
 
     TaskContainer taskContainer = 
getIndexingServiceClient().getTaskContainer(task.getId());
     final ParallelIndexSupervisorTask executedTask = 
(ParallelIndexSupervisorTask) taskContainer.getTask();
-    Map<String, Object> actualReports = executedTask.doGetLiveReports(true);
+    TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true);
 
-    final long processedBytes = useInputFormatApi ? 335 : 0;
-    RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 
processedBytes, 1, 1, 1);
+    final RowIngestionMetersTotals expectedTotals = new 
RowIngestionMetersTotals(10, 335, 1, 1, 1);
     List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
         new ParseExceptionReport(
             "{ts=2017unparseable}",
@@ -516,7 +515,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         )
     );
 
-    Map<String, Object> expectedReports;
+    TaskReport.ReportMap expectedReports;
     if (useInputFormatApi) {
       expectedReports = buildExpectedTaskReportSequential(
           task.getId(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index fb454acafec..1f7bd929d8a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -461,9 +461,9 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
 
   protected IngestionStatsAndErrors getTaskReportData() throws IOException
   {
-    Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
+    TaskReport.ReportMap taskReports = OBJECT_MAPPER.readValue(
         reportsFile,
-        new TypeReference<Map<String, TaskReport>>()
+        new TypeReference<TaskReport.ReportMap>()
         {
         }
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to