This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b41dd  Task reports for parallel task: single phase and sequential 
mode (#11688)
22b41dd is described below

commit 22b41ddbbfe2b07b085e295ba171bcdc07e04900
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Sep 16 13:58:11 2021 -0500

    Task reports for parallel task: single phase and sequential mode (#11688)
    
    * Task reports for parallel task: single phase and sequential mode
    
    * Address comments
    
    * Add null check for currentSubTaskHolder
---
 .../druid/indexing/common/task/IndexTask.java      |  17 +-
 .../parallel/ParallelIndexSupervisorTask.java      | 222 ++++++++++++++-
 .../batch/parallel/PartialSegmentMergeTask.java    |   6 +-
 .../task/batch/parallel/PushedSegmentsReport.java  |  22 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    | 315 ++++++++++++++++++---
 .../AbstractParallelIndexSupervisorTaskTest.java   |  19 +-
 .../ParallelIndexSupervisorTaskResourceTest.java   |   3 +-
 .../batch/parallel/PushedSegmentsReportTest.java   |  32 +++
 .../parallel/SinglePhaseParallelIndexingTest.java  | 169 ++++++++++-
 .../incremental/RowIngestionMetersTotals.java      |  35 +++
 .../incremental/RowIngestionMetersTotalsTest.java  |  32 +++
 .../client/indexing/HttpIndexingServiceClient.java |  26 ++
 .../client/indexing/IndexingServiceClient.java     |   3 +
 .../indexing/HttpIndexingServiceClientTest.java    |  67 +++++
 .../client/indexing/NoopIndexingServiceClient.java |   7 +
 15 files changed, 903 insertions(+), 72 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a798522..f22c2a0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -286,7 +286,12 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    Map<String, List<String>> events = new HashMap<>();
+    return Response.ok(doGetUnparseableEvents(full)).build();
+  }
+
+  public Map<String, Object> doGetUnparseableEvents(String full)
+  {
+    Map<String, Object> events = new HashMap<>();
 
     boolean needsDeterminePartitions = false;
     boolean needsBuildSegments = false;
@@ -325,11 +330,10 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
           )
       );
     }
-
-    return Response.ok(events).build();
+    return events;
   }
 
-  private Map<String, Object> doGetRowStats(String full)
+  public Map<String, Object> doGetRowStats(String full)
   {
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> totalsMap = new HashMap<>();
@@ -784,6 +788,11 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     return hllCollectors;
   }
 
+  public IngestionState getIngestionState()
+  {
+    return ingestionState;
+  }
+
   /**
    * This method reads input data row by row and adds the read row to a proper 
segment using {@link BaseAppenderatorDriver}.
    * If there is no segment for the row, a new one is created.  Segments can 
be published in the middle of reading inputs
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index b66f49f..19b965c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
@@ -66,6 +67,8 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -92,6 +95,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -169,6 +173,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @MonotonicNonNull
   private volatile TaskToolbox toolbox;
 
+  private IngestionState ingestionState;
+
   @JsonCreator
   public ParallelIndexSupervisorTask(
       @JsonProperty("id") String id,
@@ -218,6 +224,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     }
 
     awaitSegmentAvailabilityTimeoutMillis = 
ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
+    this.ingestionState = IngestionState.NOT_STARTED;
   }
 
   private static void 
checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
@@ -484,6 +491,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       }
     }
     finally {
+      ingestionState = IngestionState.COMPLETED;
       toolbox.getChatHandlerProvider().unregister(getId());
     }
   }
@@ -553,18 +561,19 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
    */
   private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws 
Exception
   {
-    final ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> 
runner = createRunner(
+    ingestionState = IngestionState.BUILD_SEGMENTS;
+    ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> 
parallelSinglePhaseRunner = createRunner(
         toolbox,
         this::createSinglePhaseTaskRunner
     );
 
-    final TaskState state = runNextPhase(runner);
+    final TaskState state = runNextPhase(parallelSinglePhaseRunner);
     TaskStatus taskStatus;
     if (state.isSuccess()) {
       //noinspection ConstantConditions
-      publishSegments(toolbox, runner.getReports());
+      publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
       if (awaitSegmentAvailabilityTimeoutMillis > 0) {
-        waitForSegmentAvailability(runner.getReports());
+        waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
       }
       taskStatus = TaskStatus.success(getId());
     } else {
@@ -572,7 +581,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       Preconditions.checkState(state.isFailure(), "Unrecognized state after 
task is complete[%s]", state);
       final String errorMessage = StringUtils.format(
           TASK_PHASE_FAILURE_MSG,
-          runner.getName()
+          parallelSinglePhaseRunner.getName()
       );
       taskStatus = TaskStatus.failure(getId(), errorMessage);
     }
@@ -1068,7 +1077,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
   {
-    final IndexTask indexTask = new IndexTask(
+    IndexTask sequentialIndexTask = new IndexTask(
         getId(),
         getGroupId(),
         getTaskResource(),
@@ -1082,8 +1091,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         getContext()
     );
 
-    if (currentSubTaskHolder.setTask(indexTask) && 
indexTask.isReady(toolbox.getTaskActionClient())) {
-      return indexTask.run(toolbox);
+    if (currentSubTaskHolder.setTask(sequentialIndexTask)
+        && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
+      return sequentialIndexTask.run(toolbox);
     } else {
       String msg = "Task was asked to stop. Finish as failed";
       LOG.info(msg);
@@ -1101,13 +1111,17 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
    */
   private Map<String, TaskReport> getTaskCompletionReports(TaskStatus 
taskStatus, boolean segmentAvailabilityConfirmed)
   {
+    Pair<Map<String, Object>, Map<String, Object>> 
rowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents(
+        "true",
+        true
+    );
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
             getId(),
             new IngestionStatsAndErrorsTaskReportData(
                 IngestionState.COMPLETED,
-                new HashMap<>(),
-                new HashMap<>(),
+                rowStatsAndUnparseableEvents.rhs,
+                rowStatsAndUnparseableEvents.lhs,
                 taskStatus.getErrorMsg(),
                 segmentAvailabilityConfirmed
             )
@@ -1415,4 +1429,192 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       }
     }
   }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object 
buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats 
will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) 
buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) 
buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + 
buildSegmentsRowStats.getClass());
+    }
+  }
+
+  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEventsParallelSinglePhase(
+      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
+      boolean includeUnparseable
+  )
+  {
+    long processed = 0L;
+    long processedWithError = 0L;
+    long thrownAway = 0L;
+    long unparseable = 0L;
+
+    List<String> unparseableEvents = new ArrayList<>();
+
+    // Get stats from completed tasks
+    Map<String, PushedSegmentsReport> completedSubtaskReports = 
parallelSinglePhaseRunner.getReports();
+    for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
+      Map<String, TaskReport> taskReport = 
pushedSegmentsReport.getTaskReport();
+      if (taskReport == null || taskReport.isEmpty()) {
+        LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
+        continue;
+      }
+      IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
+          IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+      IngestionStatsAndErrorsTaskReportData reportData =
+          (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
+      RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+          reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+      );
+
+      if (includeUnparseable) {
+        List<String> taskUnparsebleEvents = (List<String>) 
reportData.getUnparseableEvents()
+                                                                     
.get(RowIngestionMeters.BUILD_SEGMENTS);
+        unparseableEvents.addAll(taskUnparsebleEvents);
+      }
+
+      processed += totals.getProcessed();
+      processedWithError += totals.getProcessedWithError();
+      thrownAway += totals.getThrownAway();
+      unparseable += totals.getUnparseable();
+    }
+
+    // Get stats from running tasks
+    Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
+    for (String runningTaskId : runningTaskIds) {
+      try {
+        Map<String, Object> report = 
toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
+        if (report == null || report.isEmpty()) {
+          // task does not have a running report yet
+          continue;
+        }
+        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) 
report.get("ingestionStatsAndErrors");
+        Map<String, Object> payload = (Map<String, Object>) 
ingestionStatsAndErrors.get("payload");
+        Map<String, Object> rowStats = (Map<String, Object>) 
payload.get("rowStats");
+        Map<String, Object> totals = (Map<String, Object>) 
rowStats.get("totals");
+        Map<String, Object> buildSegments = (Map<String, Object>) 
totals.get(RowIngestionMeters.BUILD_SEGMENTS);
+
+        if (includeUnparseable) {
+          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) 
payload.get("unparseableEvents");
+          List<String> buildSegmentsUnparseableEvents = (List<String>) 
taskUnparseableEvents.get(
+              RowIngestionMeters.BUILD_SEGMENTS
+          );
+          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
+        }
+
+        processed += ((Number) buildSegments.get("processed")).longValue();
+        processedWithError += ((Number) 
buildSegments.get("processedWithError")).longValue();
+        thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
+        unparseable += ((Number) buildSegments.get("unparseable")).longValue();
+      }
+      catch (Exception e) {
+        LOG.warn(e, "Encountered exception when getting live subtask report 
for task: " + runningTaskId);
+      }
+    }
+
+    Map<String, Object> rowStatsMap = new HashMap<>();
+    Map<String, Object> totalsMap = new HashMap<>();
+    totalsMap.put(
+        RowIngestionMeters.BUILD_SEGMENTS,
+        new RowIngestionMetersTotals(processed, processedWithError, 
thrownAway, unparseable)
+    );
+    rowStatsMap.put("totals", totalsMap);
+
+    return Pair.of(rowStatsMap, 
ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
+  }
+
+  private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
+  {
+    if (currentSubTaskHolder == null) {
+      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+    }
+
+    Object currentRunner = currentSubTaskHolder.getTask();
+    if (currentRunner == null) {
+      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+    }
+
+    if (isParallelMode()) {
+      if (isGuaranteedRollup(ingestionSchema.getIOConfig(), 
ingestionSchema.getTuningConfig())) {
+        // multiphase is not supported yet
+        return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      } else {
+        return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
+            (SinglePhaseParallelIndexTaskRunner) currentRunner,
+            includeUnparseable
+        );
+      }
+    } else {
+      IndexTask currentSequentialTask = (IndexTask) currentRunner;
+      return Pair.of(currentSequentialTask.doGetRowStats(full), 
currentSequentialTask.doGetUnparseableEvents(full));
+    }
+  }
+
+  @GET
+  @Path("/rowStats")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getRowStats(
+      @Context final HttpServletRequest req,
+      @QueryParam("full") String full
+  )
+  {
+    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    return Response.ok(doGetRowStatsAndUnparseableEvents(full, 
false).lhs).build();
+  }
+
+  @VisibleForTesting
+  public Map<String, Object> doGetLiveReports(String full)
+  {
+    Map<String, Object> returnMap = new HashMap<>();
+    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+    Map<String, Object> payload = new HashMap<>();
+
+    Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents 
=
+        doGetRowStatsAndUnparseableEvents(full, true);
+
+    // use the sequential task's ingestion state if we were running that mode
+    IngestionState ingestionStateForReport;
+    if (isParallelMode()) {
+      ingestionStateForReport = ingestionState;
+    } else {
+      IndexTask currentSequentialTask = (IndexTask) 
currentSubTaskHolder.getTask();
+      ingestionStateForReport = currentSequentialTask == null
+                                ? ingestionState
+                                : currentSequentialTask.getIngestionState();
+    }
+
+    payload.put("ingestionState", ingestionStateForReport);
+    payload.put("unparseableEvents", rowStatsAndUnparsebleEvents.rhs);
+    payload.put("rowStats", rowStatsAndUnparsebleEvents.lhs);
+
+    ingestionStatsAndErrors.put("taskId", getId());
+    ingestionStatsAndErrors.put("payload", payload);
+    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+    return returnMap;
+  }
+
+  @GET
+  @Path("/liveReports")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getLiveReports(
+      @Context final HttpServletRequest req,
+      @QueryParam("full") String full
+  )
+  {
+    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+
+    return Response.ok(doGetLiveReports(full)).build();
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 38a7261..60814fd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -197,7 +198,10 @@ abstract class PartialSegmentMergeTask<S extends 
ShardSpec> extends PerfectRollu
         intervalToUnzippedFiles
     );
 
-    taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), 
Collections.emptySet(), pushedSegments));
+    taskClient.report(
+        supervisorTaskId,
+        new PushedSegmentsReport(getId(), Collections.emptySet(), 
pushedSegments, ImmutableMap.of())
+    );
 
     return TaskStatus.success(getId());
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
index 8654172..8ed373c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReport.java
@@ -22,8 +22,10 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -39,17 +41,20 @@ public class PushedSegmentsReport implements SubTaskReport
   private final String taskId;
   private final Set<DataSegment> oldSegments;
   private final Set<DataSegment> newSegments;
+  private final Map<String, TaskReport> taskReport;
 
   @JsonCreator
   public PushedSegmentsReport(
       @JsonProperty("taskId") String taskId,
       @JsonProperty("oldSegments") Set<DataSegment> oldSegments,
-      @JsonProperty("segments") Set<DataSegment> newSegments
+      @JsonProperty("segments") Set<DataSegment> newSegments,
+      @JsonProperty("taskReport") Map<String, TaskReport> taskReport
   )
   {
     this.taskId = Preconditions.checkNotNull(taskId, "taskId");
     this.oldSegments = Preconditions.checkNotNull(oldSegments, "oldSegments");
     this.newSegments = Preconditions.checkNotNull(newSegments, "newSegments");
+    this.taskReport = taskReport;
   }
 
   @Override
@@ -71,6 +76,12 @@ public class PushedSegmentsReport implements SubTaskReport
     return newSegments;
   }
 
+  @JsonProperty("taskReport")
+  public Map<String, TaskReport> getTaskReport()
+  {
+    return taskReport;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -81,14 +92,15 @@ public class PushedSegmentsReport implements SubTaskReport
       return false;
     }
     PushedSegmentsReport that = (PushedSegmentsReport) o;
-    return Objects.equals(taskId, that.taskId) &&
-           Objects.equals(oldSegments, that.oldSegments) &&
-           Objects.equals(newSegments, that.newSegments);
+    return Objects.equals(taskId, that.taskId)
+           && Objects.equals(oldSegments, that.oldSegments)
+           && Objects.equals(newSegments, that.newSegments)
+           && Objects.equals(taskReport, that.taskReport);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(taskId, oldSegments, newSegments);
+    return Objects.hash(taskId, oldSegments, newSegments, taskReport);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index af885e7..08cba47 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -21,13 +21,20 @@ package 
org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -35,6 +42,7 @@ import 
org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -58,16 +66,29 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.firehose.ChatHandler;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -80,7 +101,7 @@ import java.util.concurrent.TimeoutException;
  * generates and pushes segments, and reports them to the {@link 
SinglePhaseParallelIndexTaskRunner} instead of
  * publishing on its own.
  */
-public class SinglePhaseSubTask extends AbstractBatchSubtask
+public class SinglePhaseSubTask extends AbstractBatchSubtask implements 
ChatHandler
 {
   public static final String TYPE = "single_phase_sub_task";
   public static final String OLD_TYPE_NAME = "index_sub";
@@ -106,6 +127,20 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask
    */
   private final boolean missingIntervalsInOverwriteMode;
 
+  @MonotonicNonNull
+  private AuthorizerMapper authorizerMapper;
+
+  @MonotonicNonNull
+  private RowIngestionMeters rowIngestionMeters;
+
+  @MonotonicNonNull
+  private ParseExceptionHandler parseExceptionHandler;
+
+  @Nullable
+  private String errorMsg;
+
+  private IngestionState ingestionState;
+
   @JsonCreator
   public SinglePhaseSubTask(
       // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
@@ -144,6 +179,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask
     if (missingIntervalsInOverwriteMode) {
       addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
     }
+    this.ingestionState = IngestionState.NOT_STARTED;
   }
 
   @Override
@@ -187,43 +223,74 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask
   }
 
   @Override
-  public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
+  public TaskStatus runTask(final TaskToolbox toolbox)
   {
-    if (missingIntervalsInOverwriteMode) {
-      LOG.warn(
-          "Intervals are missing in granularitySpec while this task is 
potentially overwriting existing segments. "
-          + "Forced to use timeChunk lock."
+    try {
+      if (missingIntervalsInOverwriteMode) {
+        LOG.warn(
+            "Intervals are missing in granularitySpec while this task is 
potentially overwriting existing segments. "
+            + "Forced to use timeChunk lock."
+        );
+      }
+      this.authorizerMapper = toolbox.getAuthorizerMapper();
+
+      toolbox.getChatHandlerProvider().register(getId(), this, false);
+
+      rowIngestionMeters = 
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+      parseExceptionHandler = new ParseExceptionHandler(
+          rowIngestionMeters,
+          ingestionSchema.getTuningConfig().isLogParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxParseExceptions(),
+          ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
       );
-    }
-    final InputSource inputSource = 
ingestionSchema.getIOConfig().getNonNullInputSource(
-        ingestionSchema.getDataSchema().getParser()
-    );
 
-    final ParallelIndexSupervisorTaskClient taskClient = 
toolbox.getSupervisorTaskClientFactory().build(
-        new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()),
-        getId(),
-        1, // always use a single http thread
-        ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
-        ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
-    );
-    final Set<DataSegment> pushedSegments = generateAndPushSegments(
-        toolbox,
-        taskClient,
-        inputSource,
-        toolbox.getIndexingTmpDir()
-    );
+      final InputSource inputSource = 
ingestionSchema.getIOConfig().getNonNullInputSource(
+          ingestionSchema.getDataSchema().getParser()
+      );
 
-    // Find inputSegments overshadowed by pushedSegments
-    final Set<DataSegment> allSegments = new 
HashSet<>(getTaskLockHelper().getLockedExistingSegments());
-    allSegments.addAll(pushedSegments);
-    final VersionedIntervalTimeline<String, DataSegment> timeline = 
VersionedIntervalTimeline.forSegments(allSegments);
-    final Set<DataSegment> oldSegments = 
FluentIterable.from(timeline.findFullyOvershadowed())
-                                                       
.transformAndConcat(TimelineObjectHolder::getObject)
-                                                       
.transform(PartitionChunk::getObject)
-                                                       .toSet();
-    taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), 
oldSegments, pushedSegments));
-
-    return TaskStatus.success(getId());
+      final ParallelIndexSupervisorTaskClient taskClient = 
toolbox.getSupervisorTaskClientFactory().build(
+          new ClientBasedTaskInfoProvider(toolbox.getIndexingServiceClient()),
+          getId(),
+          1, // always use a single http thread
+          ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
+          ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
+      );
+      ingestionState = IngestionState.BUILD_SEGMENTS;
+      final Set<DataSegment> pushedSegments = generateAndPushSegments(
+          toolbox,
+          taskClient,
+          inputSource,
+          toolbox.getIndexingTmpDir()
+      );
+      
+      // Find inputSegments overshadowed by pushedSegments
+      final Set<DataSegment> allSegments = new 
HashSet<>(getTaskLockHelper().getLockedExistingSegments());
+      allSegments.addAll(pushedSegments);
+      final VersionedIntervalTimeline<String, DataSegment> timeline = 
VersionedIntervalTimeline.forSegments(allSegments);
+      final Set<DataSegment> oldSegments = 
FluentIterable.from(timeline.findFullyOvershadowed())
+                                                         
.transformAndConcat(TimelineObjectHolder::getObject)
+                                                         
.transform(PartitionChunk::getObject)
+                                                         .toSet();
+
+      Map<String, TaskReport> taskReport = getTaskCompletionReports();
+      taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), 
oldSegments, pushedSegments, taskReport));
+
+      toolbox.getTaskReportFileWriter().write(getId(), taskReport);
+
+      return TaskStatus.success(getId());
+    }
+    catch (Exception e) {
+      LOG.error(e, "Encountered exception in parallel sub task.");
+      errorMsg = Throwables.getStackTraceAsString(e);
+      toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+      return TaskStatus.failure(
+          getId(),
+          errorMsg
+      );
+    }
+    finally {
+      toolbox.getChatHandlerProvider().unregister(getId());
+    }
   }
 
   @Override
@@ -324,13 +391,6 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask
         useLineageBasedSegmentAllocation
     );
 
-    final RowIngestionMeters rowIngestionMeters = 
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
-    final ParseExceptionHandler parseExceptionHandler = new 
ParseExceptionHandler(
-        rowIngestionMeters,
-        tuningConfig.isLogParseExceptions(),
-        tuningConfig.getMaxParseExceptions(),
-        tuningConfig.getMaxSavedParseExceptions()
-    );
     final Appenderator appenderator = BatchAppenderators.newAppenderator(
         getId(),
         toolbox.getAppenderatorsManager(),
@@ -339,12 +399,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask
         dataSchema,
         tuningConfig,
         rowIngestionMeters,
-        new ParseExceptionHandler(
-            rowIngestionMeters,
-            tuningConfig.isLogParseExceptions(),
-            tuningConfig.getMaxParseExceptions(),
-            tuningConfig.getMaxSavedParseExceptions()
-        )
+        parseExceptionHandler
     );
     boolean exceptionOccurred = false;
     try (
@@ -424,4 +479,170 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask
       }
     }
   }
+
+  @GET
+  @Path("/unparseableEvents")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getUnparseableEvents(
+      @Context final HttpServletRequest req,
+      @QueryParam("full") String full
+  )
+  {
+    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    Map<String, List<String>> events = new HashMap<>();
+
+    boolean needsBuildSegments = false;
+
+    if (full != null) {
+      needsBuildSegments = true;
+    } else {
+      switch (ingestionState) {
+        case BUILD_SEGMENTS:
+        case COMPLETED:
+          needsBuildSegments = true;
+          break;
+        default:
+          break;
+      }
+    }
+
+    if (needsBuildSegments) {
+      events.put(
+          RowIngestionMeters.BUILD_SEGMENTS,
+          IndexTaskUtils.getMessagesFromSavedParseExceptions(
+              parseExceptionHandler.getSavedParseExceptions()
+          )
+      );
+    }
+
+    return Response.ok(events).build();
+  }
+
+  private Map<String, Object> doGetRowStats(String full)
+  {
+    Map<String, Object> returnMap = new HashMap<>();
+    Map<String, Object> totalsMap = new HashMap<>();
+    Map<String, Object> averagesMap = new HashMap<>();
+
+    boolean needsBuildSegments = false;
+
+    if (full != null) {
+      needsBuildSegments = true;
+    } else {
+      switch (ingestionState) {
+        case BUILD_SEGMENTS:
+        case COMPLETED:
+          needsBuildSegments = true;
+          break;
+        default:
+          break;
+      }
+    }
+
+    if (needsBuildSegments) {
+      totalsMap.put(
+          RowIngestionMeters.BUILD_SEGMENTS,
+          rowIngestionMeters.getTotals()
+      );
+      averagesMap.put(
+          RowIngestionMeters.BUILD_SEGMENTS,
+          rowIngestionMeters.getMovingAverages()
+      );
+    }
+
+    returnMap.put("totals", totalsMap);
+    returnMap.put("movingAverages", averagesMap);
+    return returnMap;
+  }
+
+  @GET
+  @Path("/rowStats")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getRowStats(
+      @Context final HttpServletRequest req,
+      @QueryParam("full") String full
+  )
+  {
+    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    return Response.ok(doGetRowStats(full)).build();
+  }
+
+  @VisibleForTesting
+  public Map<String, Object> doGetLiveReports(String full)
+  {
+    Map<String, Object> returnMap = new HashMap<>();
+    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+    Map<String, Object> payload = new HashMap<>();
+    Map<String, Object> events = getTaskCompletionUnparseableEvents();
+
+    payload.put("ingestionState", ingestionState);
+    payload.put("unparseableEvents", events);
+    payload.put("rowStats", doGetRowStats(full));
+
+    ingestionStatsAndErrors.put("taskId", getId());
+    ingestionStatsAndErrors.put("payload", payload);
+    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+    return returnMap;
+  }
+
+  @GET
+  @Path("/liveReports")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getLiveReports(
+      @Context final HttpServletRequest req,
+      @QueryParam("full") String full
+  )
+  {
+    IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
+    return Response.ok(doGetLiveReports(full)).build();
+  }
+
+  private Map<String, Object> getTaskCompletionRowStats()
+  {
+    Map<String, Object> metrics = new HashMap<>();
+    metrics.put(
+        RowIngestionMeters.BUILD_SEGMENTS,
+        rowIngestionMeters.getTotals()
+    );
+    return metrics;
+  }
+
+  /**
+   * Generate an IngestionStatsAndErrorsTaskReport for the task.
+   **
+   * @return
+   */
+  private Map<String, TaskReport> getTaskCompletionReports()
+  {
+    return TaskReport.buildTaskReports(
+        new IngestionStatsAndErrorsTaskReport(
+            getId(),
+            new IngestionStatsAndErrorsTaskReportData(
+                IngestionState.COMPLETED,
+                getTaskCompletionUnparseableEvents(),
+                getTaskCompletionRowStats(),
+                errorMsg,
+                false // not applicable for parallel subtask
+            )
+        )
+    );
+  }
+
+  private Map<String, Object> getTaskCompletionUnparseableEvents()
+  {
+    Map<String, Object> unparseableEventsMap = new HashMap<>();
+    List<String> parseExceptionMessages = 
IndexTaskUtils.getMessagesFromSavedParseExceptions(
+        parseExceptionHandler.getSavedParseExceptions()
+    );
+
+    if (parseExceptionMessages != null) {
+      unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, 
parseExceptionMessages);
+    } else {
+      unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, 
ImmutableList.of());
+    }
+
+    return unparseableEventsMap;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index fc1328b..9f5e4d6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -177,7 +177,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
           null,
           null,
           null,
-          null,
+          5,
           null,
           null
       );
@@ -312,7 +312,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
     return coordinatorClient;
   }
 
-  private static class TaskContainer
+  protected static class TaskContainer
   {
     private final Task task;
     @MonotonicNonNull
@@ -325,6 +325,11 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       this.task = task;
     }
 
+    public Task getTask()
+    {
+      return task;
+    }
+
     private void setStatusFuture(Future<TaskStatus> statusFuture)
     {
       this.statusFuture = statusFuture;
@@ -421,6 +426,11 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       }
     }
 
+    private TaskContainer getTaskContainer(String taskId)
+    {
+      return tasks.get(taskId);
+    }
+
     private Future<TaskStatus> runTask(Task task)
     {
       final TaskContainer taskContainer = new TaskContainer(task);
@@ -531,6 +541,11 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       return taskRunner.run(injectIfNeeded(task));
     }
 
+    public TaskContainer getTaskContainer(String taskId)
+    {
+      return taskRunner.getTaskContainer(taskId);
+    }
+
     public TaskStatus runAndWait(Task task)
     {
       return taskRunner.runAndWait(injectIfNeeded(task));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index ecb910b..184ff84 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.AbstractInputSource;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSplit;
@@ -708,7 +709,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
 
       taskClient.report(
           getSupervisorTaskId(),
-          new PushedSegmentsReport(getId(), Collections.emptySet(), 
Collections.singleton(segment))
+          new PushedSegmentsReport(getId(), Collections.emptySet(), 
Collections.singleton(segment), ImmutableMap.of())
       );
       return TaskStatus.fromCode(getId(), state);
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
new file mode 100644
index 0000000..05d0a1f
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class PushedSegmentsReportTest
+{
+  @Test
+  public void testEquals()
+  {
+    
EqualsVerifier.forClass(PushedSegmentsReport.class).usingGetClass().verify();
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index f04559f..fdcab96 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -22,8 +22,10 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -37,6 +39,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
@@ -61,6 +64,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -105,6 +109,14 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
                Files.newBufferedWriter(new File(inputDir, "test_" + 
i).toPath(), StandardCharsets.UTF_8)) {
         writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 24 + 
i, i));
         writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + 
i, i));
+        if (i == 0) {
+          // thrown away due to timestamp outside interval
+          writer.write(StringUtils.format("2012-12-%d,%d th test file\n", 25 + 
i, i));
+          // unparseable metric value
+          writer.write(StringUtils.format("2017-12-%d,%d th test 
file,badval\n", 25 + i, i));
+          // unparseable row
+          writer.write(StringUtils.format("2017unparseable\n"));
+        }
       }
     }
 
@@ -114,6 +126,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + 
i, i));
       }
     }
+
     
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
   }
 
@@ -153,7 +166,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
     }
   }
 
-  private void runTestTask(
+  private ParallelIndexSupervisorTask runTestTask(
       @Nullable Interval interval,
       Granularity segmentGranularity,
       boolean appendToExisting,
@@ -170,9 +183,11 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         appendToExisting,
         originalSegmentsIfAppend
     );
+    TaskContainer taskContainer = 
getIndexingServiceClient().getTaskContainer(task.getId());
+    return (ParallelIndexSupervisorTask) taskContainer.getTask();
   }
 
-  private void runOverwriteTask(
+  private ParallelIndexSupervisorTask runOverwriteTask(
       @Nullable Interval interval,
       Granularity segmentGranularity,
       LockGranularity actualLockGranularity
@@ -182,6 +197,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
     Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
     assertShardSpecAfterOverwrite(task, actualLockGranularity);
+    TaskContainer taskContainer = 
getIndexingServiceClient().getTaskContainer(task.getId());
+    return (ParallelIndexSupervisorTask) taskContainer.getTask();
   }
 
   private void testRunAndOverwrite(@Nullable Interval inputInterval, 
Granularity secondSegmentGranularity)
@@ -296,6 +313,58 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
     testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
   }
 
+  @Test()
+  public void testRunInParallelTaskReports()
+  {
+    ParallelIndexSupervisorTask task = runTestTask(
+        Intervals.of("2017-12/P1M"),
+        Granularities.DAY,
+        false,
+        Collections.emptyList()
+    );
+    Map<String, Object> actualReports = task.doGetLiveReports("full");
+    Map<String, Object> expectedReports = getExpectedTaskReportParallel(
+        task.getId(),
+        ImmutableList.of(
+            "Timestamp[2017unparseable] is unparseable! Event: 
{ts=2017unparseable}",
+            "Found unparseable columns in row: 
[MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z,"
+            + " event={ts=2017-12-25, dim=0 th test file, val=badval}, 
dimensions=[ts, dim]}], "
+            + "exceptions: [Unable to parse value[badval] for field[val]]"
+        ),
+        new RowIngestionMetersTotals(
+            10,
+            1,
+            1,
+            1)
+    );
+    Assert.assertEquals(expectedReports, actualReports);
+  }
+
+  private Map<String, Object> getExpectedTaskReportParallel(
+      String taskId,
+      List<String> expectedUnparseableEvents,
+      RowIngestionMetersTotals expectedTotals
+  )
+  {
+    Map<String, Object> returnMap = new HashMap<>();
+    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+    Map<String, Object> payload = new HashMap<>();
+
+    payload.put("ingestionState", IngestionState.COMPLETED);
+    payload.put("unparseableEvents", ImmutableMap.of("buildSegments", 
expectedUnparseableEvents));
+    payload.put("rowStats", ImmutableMap.of("totals", 
ImmutableMap.of("buildSegments", expectedTotals)));
+
+    ingestionStatsAndErrors.put("taskId", taskId);
+    ingestionStatsAndErrors.put("payload", payload);
+    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+    return returnMap;
+  }
+
+  //
+  // Ingest all data.
+
   @Test
   public void testWithoutIntervalWithDifferentSegmentGranularity()
   {
@@ -318,6 +387,102 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
     Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
     assertShardSpec(task, lockGranularity, appendToExisting, 
Collections.emptyList());
+
+    TaskContainer taskContainer = 
getIndexingServiceClient().getTaskContainer(task.getId());
+    final ParallelIndexSupervisorTask executedTask = 
(ParallelIndexSupervisorTask) taskContainer.getTask();
+    Map<String, Object> actualReports = executedTask.doGetLiveReports("full");
+
+    RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(
+        10,
+        1,
+        1,
+        1
+    );
+    List<String> expectedUnparseableEvents = ImmutableList.of(
+        "Timestamp[2017unparseable] is unparseable! Event: 
{ts=2017unparseable}",
+        "Found unparseable columns in row: 
[MapBasedInputRow{timestamp=2017-12-25T00:00:00.000Z,"
+        + " event={ts=2017-12-25, dim=0 th test file, val=badval}, 
dimensions=[ts, dim]}], "
+        + "exceptions: [Unable to parse value[badval] for field[val]]"
+    );
+
+    Map<String, Object> expectedReports;
+    if (useInputFormatApi) {
+      expectedReports = getExpectedTaskReportSequential(
+          task.getId(),
+          expectedUnparseableEvents,
+          expectedTotals
+      );
+    } else {
+      // when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses 
the single phase runner
+      // instead of sequential runner
+      expectedReports = getExpectedTaskReportParallel(
+          task.getId(),
+          expectedUnparseableEvents,
+          expectedTotals
+      );
+    }
+
+    Assert.assertEquals(expectedReports, actualReports);
+    System.out.println(actualReports);
+  }
+
+  private Map<String, Object> getExpectedTaskReportSequential(
+      String taskId,
+      List<String> expectedUnparseableEvents,
+      RowIngestionMetersTotals expectedTotals
+  )
+  {
+    Map<String, Object> returnMap = new HashMap<>();
+    Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
+    Map<String, Object> payload = new HashMap<>();
+
+    payload.put("ingestionState", IngestionState.COMPLETED);
+    payload.put(
+        "unparseableEvents",
+        ImmutableMap.of(
+            "determinePartitions", ImmutableList.of(),
+            "buildSegments", expectedUnparseableEvents
+        )
+    );
+    Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
+        "processed", 0.0,
+        "unparseable", 0.0,
+        "thrownAway", 0.0,
+        "processedWithError", 0.0
+    );
+
+    Map<String, Object> emptyAverages = ImmutableMap.of(
+        "1m", emptyAverageMinuteMap,
+        "5m", emptyAverageMinuteMap,
+        "15m", emptyAverageMinuteMap
+    );
+
+    payload.put(
+        "rowStats",
+        ImmutableMap.of(
+            "movingAverages",
+            ImmutableMap.of(
+                "determinePartitions",
+                emptyAverages,
+                "buildSegments",
+                emptyAverages
+            ),
+            "totals",
+            ImmutableMap.of(
+                "determinePartitions",
+                new RowIngestionMetersTotals(0, 0, 0, 0),
+                "buildSegments",
+                expectedTotals
+            )
+        )
+    );
+
+    ingestionStatsAndErrors.put("taskId", taskId);
+    ingestionStatsAndErrors.put("payload", payload);
+    ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
+
+    returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
+    return returnMap;
   }
 
   @Test
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
index d9c6ce5..89bac75 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java
@@ -22,6 +22,8 @@ package org.apache.druid.segment.incremental;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Objects;
+
 public class RowIngestionMetersTotals
 {
   private final long processed;
@@ -66,4 +68,37 @@ public class RowIngestionMetersTotals
   {
     return unparseable;
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RowIngestionMetersTotals that = (RowIngestionMetersTotals) o;
+    return processed == that.processed
+           && processedWithError == that.processedWithError
+           && thrownAway == that.thrownAway
+           && unparseable == that.unparseable;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(processed, processedWithError, thrownAway, 
unparseable);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "RowIngestionMetersTotals{" +
+           "processed=" + processed +
+           ", processedWithError=" + processedWithError +
+           ", thrownAway=" + thrownAway +
+           ", unparseable=" + unparseable +
+           '}';
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
new file mode 100644
index 0000000..197b984
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/RowIngestionMetersTotalsTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class RowIngestionMetersTotalsTest
+{
+  @Test
+  public void testEquals()
+  {
+    
EqualsVerifier.forClass(RowIngestionMetersTotals.class).usingGetClass().verify();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 60bbae6..afab3b5 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -336,6 +336,32 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
     }
   }
 
+  @Nullable
+  @Override
+  public Map<String, Object> getTaskReport(String taskId)
+  {
+    try {
+      final StringFullResponseHolder responseHolder = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format("/druid/indexer/v1/task/%s/reports", 
StringUtils.urlEncode(taskId))
+          )
+      );
+
+      if (responseHolder.getContent().length() == 0) {
+        return null;
+      }
+
+      return jsonMapper.readValue(
+          responseHolder.getContent(),
+          JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+      );
+    }
+    catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> 
minTaskPriority)
   {
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index c379077..84f9f55 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -66,6 +66,9 @@ public interface IndexingServiceClient
   @Nullable
   TaskPayloadResponse getTaskPayload(String taskId);
 
+  @Nullable
+  Map<String, Object> getTaskReport(String taskId);
+
   /**
    * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
    *
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
index ff860ee..67b99b9 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
@@ -39,6 +39,7 @@ import org.junit.rules.ExpectedException;
 
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 public class HttpIndexingServiceClientTest
 {
@@ -161,4 +162,70 @@ public class HttpIndexingServiceClientTest
     httpIndexingServiceClient.sample(samplerSpec);
     EasyMock.verify(druidLeaderClient, response);
   }
+
+  @Test
+  public void testGetTaskReport() throws Exception
+  {
+    String taskId = "testTaskId";
+    HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getContent()).andReturn(new 
BigEndianHeapChannelBuffer(0));
+    EasyMock.replay(response);
+
+    Map<String, Object> dummyResponse = ImmutableMap.of("test", "value");
+
+    StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+        HttpResponseStatus.OK,
+        response,
+        StandardCharsets.UTF_8
+    ).addChunk(jsonMapper.writeValueAsString(dummyResponse));
+
+    EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+            .andReturn(responseHolder)
+            .anyTimes();
+
+    EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/task/testTaskId/reports"))
+            .andReturn(new Request(
+                HttpMethod.GET,
+                new 
URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports";)
+            ))
+            .anyTimes();
+    EasyMock.replay(druidLeaderClient);
+
+    final Map<String, Object> actualResponse = 
httpIndexingServiceClient.getTaskReport(taskId);
+    Assert.assertEquals(dummyResponse, actualResponse);
+
+    EasyMock.verify(druidLeaderClient, response);
+  }
+
+  @Test
+  public void testGetTaskReportEmpty() throws Exception
+  {
+    String taskId = "testTaskId";
+    HttpResponse response = EasyMock.createMock(HttpResponse.class);
+    EasyMock.expect(response.getContent()).andReturn(new 
BigEndianHeapChannelBuffer(0));
+    EasyMock.replay(response);
+
+    StringFullResponseHolder responseHolder = new StringFullResponseHolder(
+        HttpResponseStatus.OK,
+        response,
+        StandardCharsets.UTF_8
+    ).addChunk("");
+
+    EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+            .andReturn(responseHolder)
+            .anyTimes();
+
+    EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/task/testTaskId/reports"))
+            .andReturn(new Request(
+                HttpMethod.GET,
+                new 
URL("http://localhost:8090/druid/indexer/v1/task/testTaskId/reports";)
+            ))
+            .anyTimes();
+    EasyMock.replay(druidLeaderClient);
+
+    final Map<String, Object> actualResponse = 
httpIndexingServiceClient.getTaskReport(taskId);
+    Assert.assertNull(actualResponse);
+
+    EasyMock.verify(druidLeaderClient, response);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index af307a4..2035dba 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -107,6 +107,13 @@ public class NoopIndexingServiceClient implements 
IndexingServiceClient
     return null;
   }
 
+  @Nullable
+  @Override
+  public Map<String, Object> getTaskReport(String taskId)
+  {
+    return null;
+  }
+
   @Override
   public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> 
minTaskPriority)
   {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to