This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4df4896674f Refactor: Add common method in AbstractBatchIndexTask to 
create ingestion stats report (#16202)
4df4896674f is described below

commit 4df4896674f57ef49e2736aa613f271112a2a111
Author: Kashif Faraz <kashif.fa...@gmail.com>
AuthorDate: Thu Mar 28 23:07:00 2024 +0530

    Refactor: Add common method in AbstractBatchIndexTask to create ingestion 
stats report (#16202)
    
    Changes
    -  No functional changes
    - Add method `AbstractBatchIndexTask.buildIngestionStatsReport()` used in 
several batch tasks
    - Add utility method `AbstractBatchIndexTask.addBuildSegmentStatsToReport()`
    - Use boolean argument to represent a full report instead of the String 
`full`
    in internal methods. (REST API remains unchanged.)
    - Rename `IngestionStatsAndErrorsTaskReportData` to 
`IngestionStatsAndErrors`
    - Clean up some of the methods
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  12 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   6 +-
 ...eportData.java => IngestionStatsAndErrors.java} |  49 +-
 .../common/IngestionStatsAndErrorsTaskReport.java  |   6 +-
 .../common/task/AbstractBatchIndexTask.java        | 128 ++--
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   4 +-
 .../indexing/common/task/HadoopIndexTask.java      |  22 +-
 .../druid/indexing/common/task/IndexTask.java      |  97 +--
 .../parallel/ParallelIndexSupervisorTask.java      |  96 +--
 .../batch/parallel/PartialSegmentGenerateTask.java |  32 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |  77 +--
 .../SeekableStreamIndexTaskRunner.java             |   4 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |  16 +-
 .../common/task/CompactionTaskParallelRunTest.java |   8 +-
 .../common/task/CompactionTaskRunTest.java         |   6 +-
 .../druid/indexing/common/task/IndexTaskTest.java  | 735 ++++++---------------
 .../indexing/common/task/ParseExceptionReport.java |   4 +-
 .../indexing/common/task/TaskReportSerdeTest.java  |   6 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |   8 +-
 .../parallel/SinglePhaseParallelIndexingTest.java  |   4 +-
 .../SeekableStreamIndexTaskTestBase.java           |   6 +-
 .../testsEx/indexer/AbstractITBatchIndexTest.java  |   4 +-
 .../clients/OverlordResourceTestClient.java        |   7 +-
 .../tests/indexer/AbstractITBatchIndexTest.java    |   4 +-
 .../druid/tests/indexer/ITCompactionTaskTest.java  |   6 +-
 25 files changed, 470 insertions(+), 877 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index c3d1d213383..85a9fdb6254 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -54,7 +54,7 @@ import 
org.apache.druid.data.input.kafkainput.KafkaInputFormat;
 import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
@@ -1615,7 +1615,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         newDataSchemaMetadata()
     );
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
@@ -1695,7 +1695,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
     Assert.assertNull(newDataSchemaMetadata());
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
@@ -3061,7 +3061,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     );
 
     // Verify unparseable data
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     ParseExceptionReport parseExceptionReport =
         ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
@@ -3233,7 +3233,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     TaskStatus status = future.get();
 
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
     Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
     
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(),
 (Long) 6L);
   }
@@ -3281,7 +3281,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     TaskStatus status = future.get();
 
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
     Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
     
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L,
 2L)));
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index e14b6679b09..ad8964d712c 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -43,7 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -1182,7 +1182,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         newDataSchemaMetadata()
     );
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
@@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
     Assert.assertNull(newDataSchemaMetadata());
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
similarity index 83%
rename from 
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
rename to 
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
index 97ea58e1c5c..d55f5ed0d19 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java
@@ -27,36 +27,19 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Objects;
 
-public class IngestionStatsAndErrorsTaskReportData
+public class IngestionStatsAndErrors
 {
-  @JsonProperty
-  private IngestionState ingestionState;
-
-  @JsonProperty
-  private Map<String, Object> unparseableEvents;
-
-  @JsonProperty
-  private Map<String, Object> rowStats;
-
-  @JsonProperty
-  @Nullable
-  private String errorMsg;
-
-  @JsonProperty
-  private boolean segmentAvailabilityConfirmed;
-
-  @JsonProperty
-  private long segmentAvailabilityWaitTimeMs;
-
-  @JsonProperty
-  private Map<String, Long> recordsProcessed;
-
-  @JsonProperty
-  private Long segmentsRead;
-  @JsonProperty
-  private Long segmentsPublished;
-
-  public IngestionStatsAndErrorsTaskReportData(
+  private final IngestionState ingestionState;
+  private final Map<String, Object> unparseableEvents;
+  private final Map<String, Object> rowStats;
+  private final String errorMsg;
+  private final boolean segmentAvailabilityConfirmed;
+  private final long segmentAvailabilityWaitTimeMs;
+  private final Map<String, Long> recordsProcessed;
+  private final Long segmentsRead;
+  private final Long segmentsPublished;
+
+  public IngestionStatsAndErrors(
       @JsonProperty("ingestionState") IngestionState ingestionState,
       @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
       @JsonProperty("rowStats") Map<String, Object> rowStats,
@@ -139,12 +122,12 @@ public class IngestionStatsAndErrorsTaskReportData
     return segmentsPublished;
   }
 
-  public static IngestionStatsAndErrorsTaskReportData 
getPayloadFromTaskReports(
+  public static IngestionStatsAndErrors getPayloadFromTaskReports(
       Map<String, TaskReport> taskReports
   )
   {
-    return (IngestionStatsAndErrorsTaskReportData) 
taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
-                                                              .getPayload();
+    return (IngestionStatsAndErrors) 
taskReports.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+                                                .getPayload();
   }
 
   @Override
@@ -156,7 +139,7 @@ public class IngestionStatsAndErrorsTaskReportData
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    IngestionStatsAndErrorsTaskReportData that = 
(IngestionStatsAndErrorsTaskReportData) o;
+    IngestionStatsAndErrors that = (IngestionStatsAndErrors) o;
     return getIngestionState() == that.getIngestionState() &&
            Objects.equals(getUnparseableEvents(), that.getUnparseableEvents()) 
&&
            Objects.equals(getRowStats(), that.getRowStats()) &&
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 7518523b1de..0122a8d3ffb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -32,12 +32,12 @@ public class IngestionStatsAndErrorsTaskReport implements 
TaskReport
   private final String taskId;
 
   @JsonProperty
-  private final IngestionStatsAndErrorsTaskReportData payload;
+  private final IngestionStatsAndErrors payload;
 
   @JsonCreator
   public IngestionStatsAndErrorsTaskReport(
       @JsonProperty("taskId") String taskId,
-      @JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload
+      @JsonProperty("payload") IngestionStatsAndErrors payload
   )
   {
     this.taskId = taskId;
@@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements 
TaskReport
   }
 
   @Override
-  public IngestionStatsAndErrorsTaskReportData getPayload()
+  public IngestionStatsAndErrors getPayload()
   {
     return payload;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index b1d3e992696..a5acadf704e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -30,9 +30,13 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.IngestionState;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -53,6 +57,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -110,8 +115,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
 {
   private static final Logger log = new Logger(AbstractBatchIndexTask.class);
 
-  protected boolean segmentAvailabilityConfirmationCompleted = false;
-  protected long segmentAvailabilityWaitTimeMs = 0L;
+  private boolean segmentAvailabilityConfirmationCompleted = false;
+  private long segmentAvailabilityWaitTimeMs = 0L;
 
   @GuardedBy("this")
   private final TaskResourceCleaner resourceCloserOnAbnormalExit = new 
TaskResourceCleaner();
@@ -123,8 +128,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
 
   private final int maxAllowedLockCount;
 
-  // Store lock versions
-  Map<Interval, String> intervalToVersion = new HashMap<>();
+  private final Map<Interval, String> intervalToLockVersion = new HashMap<>();
 
   protected AbstractBatchIndexTask(String id, String dataSource, Map<String, 
Object> context, IngestionMode ingestionMode)
   {
@@ -481,7 +485,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
         throw new ISE(StringUtils.format("Lock for interval [%s] was 
revoked.", cur));
       }
       locksAcquired++;
-      intervalToVersion.put(cur, lock.getVersion());
+      intervalToLockVersion.put(cur, lock.getVersion());
     }
     return true;
   }
@@ -685,10 +689,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
    * the cluster. Doing so gives an end user assurance that a Successful task 
status means their data is available
    * for querying.
    *
-   * @param toolbox {@link TaskToolbox} object with for assisting with task 
work.
-   * @param segmentsToWaitFor {@link List} of segments to wait for 
availability.
-   * @param waitTimeout Millis to wait before giving up
-   * @return True if all segments became available, otherwise False.
+   * @return True if all segments became available before the {@code 
waitTimeoutMillis}
+   * elapsed, otherwise false.
    */
   protected boolean waitForSegmentAvailability(
       TaskToolbox toolbox,
@@ -697,22 +699,22 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   )
   {
     if (segmentsToWaitFor.isEmpty()) {
-      log.info("Asked to wait for segments to be available, but I wasn't 
provided with any segments.");
+      log.info("No segments to wait for availability.");
       return true;
     } else if (waitTimeout < 0) {
-      log.warn("Asked to wait for availability for < 0 seconds?! Requested 
waitTimeout: [%s]", waitTimeout);
+      log.warn("Not waiting for segment availability as waitTimeout[%s] is 
less than zero.", waitTimeout);
       return false;
     }
     log.info("Waiting for [%d] segments to be loaded by the cluster...", 
segmentsToWaitFor.size());
-    final long start = System.nanoTime();
+    final Stopwatch stopwatch = Stopwatch.createStarted();
 
     try (
         SegmentHandoffNotifier notifier = 
toolbox.getSegmentHandoffNotifierFactory()
                                                  
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())
     ) {
 
-      ExecutorService exec = Execs.directExecutor();
-      CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+      final ExecutorService exec = Execs.directExecutor();
+      final CountDownLatch doneSignal = new 
CountDownLatch(segmentsToWaitFor.size());
 
       notifier.start();
       for (DataSegment s : segmentsToWaitFor) {
@@ -720,11 +722,11 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
             new SegmentDescriptor(s.getInterval(), s.getVersion(), 
s.getShardSpec().getPartitionNum()),
             exec,
             () -> {
+              doneSignal.countDown();
               log.debug(
-                  "Confirmed availability for [%s]. Removing from list of 
segments to wait for",
-                  s.getId()
+                  "Segment[%s] is now available, [%d] segments remaining.",
+                  s.getId(), doneSignal.getCount()
               );
-              doneSignal.countDown();
             }
         );
       }
@@ -737,7 +739,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
       return false;
     }
     finally {
-      segmentAvailabilityWaitTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+      segmentAvailabilityWaitTimeMs = stopwatch.millisElapsed();
       toolbox.getEmitter().emit(
           new ServiceMetricEvent.Builder()
               .setDimension("dataSource", getDataSource())
@@ -782,12 +784,12 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     if (revokedLock != null) {
       throw new ISE("Lock revoked: [%s]", revokedLock);
     }
-    final Map<Interval, String> versions = locks
-        .stream()
-        .collect(Collectors.toMap(TaskLock::getInterval, 
TaskLock::getVersion));
+    final Map<Interval, String> versions = locks.stream().collect(
+        Collectors.toMap(TaskLock::getInterval, TaskLock::getVersion)
+    );
 
-    Interval interval;
-    String version;
+    final Interval interval;
+    final String version;
     if (!materializedBucketIntervals.isEmpty()) {
       // If granularity spec has explicit intervals, we just need to find the 
version associated to the interval.
       // This is because we should have gotten all required locks up front 
when the task starts up.
@@ -809,8 +811,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
       // We don't have explicit intervals. We can use the segment granularity 
to figure out what
       // interval we need, but we might not have already locked it.
       interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
-      version = AbstractBatchIndexTask.findVersion(versions, interval);
-      if (version == null) {
+      final String existingLockVersion = 
AbstractBatchIndexTask.findVersion(versions, interval);
+      if (existingLockVersion == null) {
         if (ingestionSpec.getTuningConfig() instanceof 
ParallelIndexTuningConfig) {
           final int maxAllowedLockCount = ((ParallelIndexTuningConfig) 
ingestionSpec.getTuningConfig())
               .getMaxAllowedLockCount();
@@ -830,6 +832,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
           throw new ISE(StringUtils.format("Lock for interval [%s] was 
revoked.", interval));
         }
         version = lock.getVersion();
+      } else {
+        version = existingLockVersion;
       }
     }
     return new NonnullPair<>(interval, version);
@@ -856,23 +860,19 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   }
 
   /**
-   * Get the version from the locks for a given timestamp. This will work if 
the locks were acquired upfront
-   * @param timestamp
-   * @return The interval andversion if n interval that contains an interval 
was found or null otherwise
+   * @return The interval and version containing the given timestamp if one 
exists, otherwise null.
    */
   @Nullable
-  Pair<Interval, String> lookupVersion(DateTime timestamp)
-  {
-    java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion = 
intervalToVersion.entrySet()
-                                                                               
           .stream()
-                                                                               
           .filter(e -> e.getKey()
-                                                                               
                         .contains(
-                                                                               
                             timestamp))
-                                                                               
           .findFirst();
-    if (!intervalAndVersion.isPresent()) {
-      return null;
-    }
-    return new Pair(intervalAndVersion.get().getKey(), 
intervalAndVersion.get().getValue());
+  private Pair<Interval, String> lookupVersion(DateTime timestamp)
+  {
+    java.util.Optional<Map.Entry<Interval, String>> intervalAndVersion
+        = intervalToLockVersion.entrySet()
+                               .stream()
+                               .filter(e -> e.getKey().contains(timestamp))
+                               .findFirst();
+    return intervalAndVersion.map(
+        entry -> new Pair<>(entry.getKey(), entry.getValue())
+    ).orElse(null);
   }
 
   protected SegmentIdWithShardSpec allocateNewSegmentForTombstone(
@@ -891,4 +891,52 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     );
   }
 
+  @Nullable
+  protected Map<String, Object> getTaskCompletionRowStats()
+  {
+    return null;
+  }
+
+  @Nullable
+  protected Map<String, Object> getTaskCompletionUnparseableEvents()
+  {
+    return null;
+  }
+
+  /**
+   * Builds a singleton map with {@link 
IngestionStatsAndErrorsTaskReport#REPORT_KEY}
+   * as key and an {@link IngestionStatsAndErrorsTaskReport} for this task as 
value.
+   */
+  protected Map<String, TaskReport> buildIngestionStatsReport(
+      IngestionState ingestionState,
+      String errorMessage,
+      Long segmentsRead,
+      Long segmentsPublished
+  )
+  {
+    return TaskReport.buildTaskReports(
+        new IngestionStatsAndErrorsTaskReport(
+            getId(),
+            new IngestionStatsAndErrors(
+                ingestionState,
+                getTaskCompletionUnparseableEvents(),
+                getTaskCompletionRowStats(),
+                errorMessage,
+                segmentAvailabilityConfirmationCompleted,
+                segmentAvailabilityWaitTimeMs,
+                Collections.emptyMap(),
+                segmentsRead,
+                segmentsPublished
+            )
+        )
+    );
+  }
+
+  protected static boolean addBuildSegmentStatsToReport(boolean isFullReport, 
IngestionState ingestionState)
+  {
+    return isFullReport
+           || ingestionState == IngestionState.BUILD_SEGMENTS
+           || ingestionState == IngestionState.COMPLETED;
+  }
+
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index e0c7f9bc934..c00f219c777 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -42,8 +42,8 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -614,7 +614,7 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
             getId(),
-            new IngestionStatsAndErrorsTaskReportData(
+            new IngestionStatsAndErrors(
                 ingestionState,
                 getTaskCompletionUnparseableEvents(),
                 getTaskCompletionRowStats(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 019b63520e7..5fd3572f9e2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -43,8 +43,6 @@ import 
org.apache.druid.indexer.MetadataStorageUpdaterJobHandler;
 import org.apache.druid.indexer.TaskMetricsGetter;
 import org.apache.druid.indexer.TaskMetricsUtils;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskReport;
@@ -684,25 +682,11 @@ public class HadoopIndexTask extends HadoopTask 
implements ChatHandler
 
   private Map<String, TaskReport> getTaskCompletionReports()
   {
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                ingestionState,
-                null,
-                getTaskCompletionRowStats(),
-                errorMsg,
-                segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap(),
-                null,
-                null
-            )
-        )
-    );
+    return buildIngestionStatsReport(ingestionState, errorMsg, null, null);
   }
 
-  private Map<String, Object> getTaskCompletionRowStats()
+  @Override
+  protected Map<String, Object> getTaskCompletionRowStats()
   {
     Map<String, Object> metrics = new HashMap<>();
     if (determineConfigStatus != null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 950515d7348..49041ce4d70 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -48,8 +48,6 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
@@ -335,34 +333,14 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    return Response.ok(doGetUnparseableEvents(full)).build();
+    return Response.ok(doGetUnparseableEvents(full != null)).build();
   }
 
-  public Map<String, Object> doGetUnparseableEvents(String full)
+  public Map<String, Object> doGetUnparseableEvents(boolean isFullReport)
   {
-    Map<String, Object> events = new HashMap<>();
+    final Map<String, Object> events = new HashMap<>();
 
-    boolean needsDeterminePartitions = false;
-    boolean needsBuildSegments = false;
-
-    if (full != null) {
-      needsDeterminePartitions = true;
-      needsBuildSegments = true;
-    } else {
-      switch (ingestionState) {
-        case DETERMINE_PARTITIONS:
-          needsDeterminePartitions = true;
-          break;
-        case BUILD_SEGMENTS:
-        case COMPLETED:
-          needsBuildSegments = true;
-          break;
-        default:
-          break;
-      }
-    }
-
-    if (needsDeterminePartitions) {
+    if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) {
       events.put(
           RowIngestionMeters.DETERMINE_PARTITIONS,
           IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -371,7 +349,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       );
     }
 
-    if (needsBuildSegments) {
+    if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
       events.put(
           RowIngestionMeters.BUILD_SEGMENTS,
           IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -382,33 +360,13 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     return events;
   }
 
-  public Map<String, Object> doGetRowStats(String full)
+  public Map<String, Object> doGetRowStats(boolean isFullReport)
   {
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> totalsMap = new HashMap<>();
     Map<String, Object> averagesMap = new HashMap<>();
 
-    boolean needsDeterminePartitions = false;
-    boolean needsBuildSegments = false;
-
-    if (full != null) {
-      needsDeterminePartitions = true;
-      needsBuildSegments = true;
-    } else {
-      switch (ingestionState) {
-        case DETERMINE_PARTITIONS:
-          needsDeterminePartitions = true;
-          break;
-        case BUILD_SEGMENTS:
-        case COMPLETED:
-          needsBuildSegments = true;
-          break;
-        default:
-          break;
-      }
-    }
-
-    if (needsDeterminePartitions) {
+    if (addDeterminePartitionStatsToReport(isFullReport, ingestionState)) {
       totalsMap.put(
           RowIngestionMeters.DETERMINE_PARTITIONS,
           determinePartitionsMeters.getTotals()
@@ -419,7 +377,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       );
     }
 
-    if (needsBuildSegments) {
+    if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
       totalsMap.put(
           RowIngestionMeters.BUILD_SEGMENTS,
           buildSegmentsMeters.getTotals()
@@ -444,7 +402,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    return Response.ok(doGetRowStats(full)).build();
+    return Response.ok(doGetRowStats(full != null)).build();
   }
 
   @GET
@@ -463,7 +421,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
     payload.put("ingestionState", ingestionState);
     payload.put("unparseableEvents", events);
-    payload.put("rowStats", doGetRowStats(full));
+    payload.put("rowStats", doGetRowStats(full != null));
 
     ingestionStatsAndErrors.put("taskId", getId());
     ingestionStatsAndErrors.put("payload", payload);
@@ -580,36 +538,16 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     }
   }
 
-
   private void updateAndWriteCompletionReports(TaskToolbox toolbox)
   {
-    completionReports = getTaskCompletionReports();
+    completionReports = buildIngestionStatsReport(ingestionState, errorMsg, 
null, null);
     if (isStandAloneTask) {
       toolbox.getTaskReportFileWriter().write(getId(), completionReports);
     }
   }
 
-  private Map<String, TaskReport> getTaskCompletionReports()
-  {
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                ingestionState,
-                getTaskCompletionUnparseableEvents(),
-                getTaskCompletionRowStats(),
-                errorMsg,
-                segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap(),
-                null,
-                null
-            )
-        )
-    );
-  }
-
-  private Map<String, Object> getTaskCompletionUnparseableEvents()
+  @Override
+  protected Map<String, Object> getTaskCompletionUnparseableEvents()
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     CircularBuffer<ParseExceptionReport> 
determinePartitionsParseExceptionReports =
@@ -631,7 +569,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     return unparseableEventsMap;
   }
 
-  private Map<String, Object> getTaskCompletionRowStats()
+  @Override
+  protected Map<String, Object> getTaskCompletionRowStats()
   {
     Map<String, Object> metrics = new HashMap<>();
     metrics.put(
@@ -709,6 +648,12 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     }
   }
 
+  private static boolean addDeterminePartitionStatsToReport(boolean 
isFullReport, IngestionState ingestionState)
+  {
+    return isFullReport
+           || ingestionState == IngestionState.DETERMINE_PARTITIONS;
+  }
+
   private static LinearPartitionAnalysis createLinearPartitionAnalysis(
       GranularitySpec granularitySpec,
       @Nonnull DynamicPartitionsSpec partitionsSpec
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index db497dff5ec..4d5b9717ca9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -40,8 +40,8 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -1237,37 +1237,32 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
-   *
-   * @param taskStatus {@link TaskStatus}
-   * @param segmentAvailabilityConfirmed Whether or not the segments were 
confirmed to be available for query when
-   *                                     when the task completed.
-   * @return
    */
-  private Map<String, TaskReport> getTaskCompletionReports(TaskStatus 
taskStatus, boolean segmentAvailabilityConfirmed)
-  {
-    Pair<Map<String, Object>, Map<String, Object>> 
rowStatsAndUnparseableEvents =
-        doGetRowStatsAndUnparseableEvents("true", true);
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                IngestionState.COMPLETED,
-                rowStatsAndUnparseableEvents.rhs,
-                rowStatsAndUnparseableEvents.lhs,
-                taskStatus.getErrorMsg(),
-                segmentAvailabilityConfirmed,
-                segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap(),
-                segmentsRead,
-                segmentsPublished
-            )
-        )
+  private Map<String, TaskReport> getTaskCompletionReports(TaskStatus 
taskStatus)
+  {
+    return buildIngestionStatsReport(
+        IngestionState.COMPLETED,
+        taskStatus.getErrorMsg(),
+        segmentsRead,
+        segmentsPublished
     );
   }
 
+  @Override
+  protected Map<String, Object> getTaskCompletionRowStats()
+  {
+    return doGetRowStatsAndUnparseableEvents(true, false).lhs;
+  }
+
+  @Override
+  protected Map<String, Object> getTaskCompletionUnparseableEvents()
+  {
+    return doGetRowStatsAndUnparseableEvents(true, true).rhs;
+  }
+
   private void updateAndWriteCompletionReports(TaskStatus status)
   {
-    completionReports = getTaskCompletionReports(status, 
segmentAvailabilityConfirmationCompleted);
+    completionReports = getTaskCompletionReports(status);
     writeCompletionReports();
   }
 
@@ -1609,11 +1604,13 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
       Map<String, TaskReport> taskReport = 
pushedSegmentsReport.getTaskReport();
       if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + 
pushedSegmentsReport.getTaskId());
+        LOG.warn("Received an empty report from subtask[%s]" + 
pushedSegmentsReport.getTaskId());
         continue;
       }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, 
unparseableEvents);
+      RowIngestionMetersTotals rowIngestionMetersTotals = 
getBuildSegmentsStatsFromTaskReport(
+          taskReport,
+          includeUnparseable ? unparseableEvents : null
+      );
 
       
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
     }
@@ -1647,11 +1644,11 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
         Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
         if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + 
generatedPartitionsReport.getTaskId());
+          LOG.warn("Received an empty report from subtask[%s]", 
generatedPartitionsReport.getTaskId());
           continue;
         }
         RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
+            getBuildSegmentsStatsFromTaskReport(taskReport, unparseableEvents);
 
         
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
 
@@ -1730,26 +1727,26 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
       Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
+      List<ParseExceptionReport> unparseableEvents
+  )
   {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) 
ingestionStatsAndErrorsReport.getPayload();
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = 
(IngestionStatsAndErrorsTaskReport)
+        taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrors reportData = 
ingestionStatsAndErrorsReport.getPayload();
     RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
         reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
     );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = 
(List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    
.get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    if (unparseableEvents != null) {
+      unparseableEvents.addAll(
+          (List<ParseExceptionReport>)
+              
reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS)
+      );
     }
     return totals;
   }
 
   private Pair<Map<String, Object>, Map<String, Object>> 
doGetRowStatsAndUnparseableEvents(
-      String full,
+      boolean isFullReport,
       boolean includeUnparseable
   )
   {
@@ -1779,7 +1776,10 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       }
     } else {
       IndexTask currentSequentialTask = (IndexTask) currentRunner;
-      return Pair.of(currentSequentialTask.doGetRowStats(full), 
currentSequentialTask.doGetUnparseableEvents(full));
+      return Pair.of(
+          currentSequentialTask.doGetRowStats(isFullReport),
+          currentSequentialTask.doGetUnparseableEvents(isFullReport)
+      );
     }
   }
 
@@ -1800,25 +1800,25 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    return Response.ok(doGetRowStatsAndUnparseableEvents(full, 
false).lhs).build();
+    return Response.ok(doGetRowStatsAndUnparseableEvents(full != null, 
false).lhs).build();
   }
 
   @VisibleForTesting
-  public Map<String, Object> doGetLiveReports(String full)
+  public Map<String, Object> doGetLiveReports(boolean isFullReport)
   {
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
     Map<String, Object> payload = new HashMap<>();
 
     Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparsebleEvents 
=
-        doGetRowStatsAndUnparseableEvents(full, true);
+        doGetRowStatsAndUnparseableEvents(isFullReport, true);
 
     // use the sequential task's ingestion state if we were running that mode
     IngestionState ingestionStateForReport;
     if (isParallelMode()) {
       ingestionStateForReport = ingestionState;
     } else {
-      IndexTask currentSequentialTask = (IndexTask) 
currentSubTaskHolder.getTask();
+      IndexTask currentSequentialTask = currentSubTaskHolder.getTask();
       ingestionStateForReport = currentSequentialTask == null
                                 ? ingestionState
                                 : currentSequentialTask.getIngestionState();
@@ -1846,7 +1846,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
 
-    return Response.ok(doGetLiveReports(full)).build();
+    return Response.ok(doGetLiveReports(full != null)).build();
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 46be219d878..bbeb00aa845 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -25,8 +25,6 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -253,25 +251,16 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
    */
   private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
   {
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                IngestionState.COMPLETED,
-                getTaskCompletionUnparseableEvents(),
-                getTaskCompletionRowStats(),
-                "",
-                false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap(),
-                segmentsRead,
-                null
-            )
-        )
+    return buildIngestionStatsReport(
+        IngestionState.COMPLETED,
+        "",
+        segmentsRead,
+        null
     );
   }
 
-  private Map<String, Object> getTaskCompletionUnparseableEvents()
+  @Override
+  protected Map<String, Object> getTaskCompletionUnparseableEvents()
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     List<ParseExceptionReport> parseExceptionMessages = 
IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -287,13 +276,12 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
     return unparseableEventsMap;
   }
 
-  private Map<String, Object> getTaskCompletionRowStats()
+  @Override
+  protected Map<String, Object> getTaskCompletionRowStats()
   {
-    Map<String, Object> metrics = new HashMap<>();
-    metrics.put(
+    return Collections.singletonMap(
         RowIngestionMeters.BUILD_SEGMENTS,
         buildSegmentsMeters.getTotals()
     );
-    return metrics;
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2ce427f7c84..465750bad03 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
@@ -33,8 +32,6 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -499,22 +496,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
     Map<String, List<ParseExceptionReport>> events = new HashMap<>();
 
-    boolean needsBuildSegments = false;
-
-    if (full != null) {
-      needsBuildSegments = true;
-    } else {
-      switch (ingestionState) {
-        case BUILD_SEGMENTS:
-        case COMPLETED:
-          needsBuildSegments = true;
-          break;
-        default:
-          break;
-      }
-    }
-
-    if (needsBuildSegments) {
+    if (addBuildSegmentStatsToReport(full != null, ingestionState)) {
       events.put(
           RowIngestionMeters.BUILD_SEGMENTS,
           IndexTaskUtils.getReportListFromSavedParseExceptions(
@@ -526,28 +508,13 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     return Response.ok(events).build();
   }
 
-  private Map<String, Object> doGetRowStats(String full)
+  private Map<String, Object> doGetRowStats(boolean isFullReport)
   {
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> totalsMap = new HashMap<>();
     Map<String, Object> averagesMap = new HashMap<>();
 
-    boolean needsBuildSegments = false;
-
-    if (full != null) {
-      needsBuildSegments = true;
-    } else {
-      switch (ingestionState) {
-        case BUILD_SEGMENTS:
-        case COMPLETED:
-          needsBuildSegments = true;
-          break;
-        default:
-          break;
-      }
-    }
-
-    if (needsBuildSegments) {
+    if (addBuildSegmentStatsToReport(isFullReport, ingestionState)) {
       totalsMap.put(
           RowIngestionMeters.BUILD_SEGMENTS,
           rowIngestionMeters.getTotals()
@@ -572,11 +539,10 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    return Response.ok(doGetRowStats(full)).build();
+    return Response.ok(doGetRowStats(full != null)).build();
   }
 
-  @VisibleForTesting
-  public Map<String, Object> doGetLiveReports(String full)
+  private Map<String, Object> doGetLiveReports(boolean isFullReport)
   {
     Map<String, Object> returnMap = new HashMap<>();
     Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
@@ -585,7 +551,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
 
     payload.put("ingestionState", ingestionState);
     payload.put("unparseableEvents", events);
-    payload.put("rowStats", doGetRowStats(full));
+    payload.put("rowStats", doGetRowStats(isFullReport));
 
     ingestionStatsAndErrors.put("taskId", getId());
     ingestionStatsAndErrors.put("payload", payload);
@@ -604,45 +570,28 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   )
   {
     IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, 
getDataSource(), authorizerMapper);
-    return Response.ok(doGetLiveReports(full)).build();
+    return Response.ok(doGetLiveReports(full != null)).build();
   }
 
-  private Map<String, Object> getTaskCompletionRowStats()
+  @Override
+  protected Map<String, Object> getTaskCompletionRowStats()
   {
-    Map<String, Object> metrics = new HashMap<>();
-    metrics.put(
+    return Collections.singletonMap(
         RowIngestionMeters.BUILD_SEGMENTS,
         rowIngestionMeters.getTotals()
     );
-    return metrics;
   }
 
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
-   **
-   * @return
    */
   private Map<String, TaskReport> getTaskCompletionReports()
   {
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                IngestionState.COMPLETED,
-                getTaskCompletionUnparseableEvents(),
-                getTaskCompletionRowStats(),
-                errorMsg,
-                false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap(),
-                null,
-                null
-            )
-        )
-    );
+    return buildIngestionStatsReport(IngestionState.COMPLETED, errorMsg, null, 
null);
   }
 
-  private Map<String, Object> getTaskCompletionUnparseableEvents()
+  @Override
+  protected Map<String, Object> getTaskCompletionUnparseableEvents()
   {
     Map<String, Object> unparseableEventsMap = new HashMap<>();
     List<ParseExceptionReport> parseExceptionMessages = 
IndexTaskUtils.getReportListFromSavedParseExceptions(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 09e89b1d601..f3e1e4a06d2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -51,8 +51,8 @@ import org.apache.druid.error.ErrorResponse;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -1123,7 +1123,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
             task.getId(),
-            new IngestionStatsAndErrorsTaskReportData(
+            new IngestionStatsAndErrors(
                 ingestionState,
                 getTaskCompletionUnparseableEvents(),
                 getTaskCompletionRowStats(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index ef769d73006..e91d25c0b03 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -52,7 +52,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
@@ -690,7 +690,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
     TaskStatus status = statusFuture.get();
     
Assert.assertTrue(status.getErrorMsg().contains("org.apache.druid.java.util.common.RE:
 Max parse exceptions[0] exceeded"));
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     ParseExceptionReport parseExceptionReport =
         ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
@@ -798,7 +798,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
     final TaskStatus taskStatus = statusFuture.get();
     Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
   }
@@ -901,7 +901,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
     final TaskStatus taskStatus = statusFuture.get();
     Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
     Assert.assertEquals(expectedMetrics, reportData.getRowStats());
 
     ParseExceptionReport parseExceptionReport =
@@ -981,7 +981,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
     Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
     Assert.assertTrue(taskStatus.getErrorMsg().contains("Max parse 
exceptions[3] exceeded"));
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.BUILD_SEGMENTS,
@@ -1257,7 +1257,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
           )
       );
 
-      IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+      IngestionStatsAndErrors reportData = getTaskReportData();
       Assert.assertEquals(expectedMetrics, reportData.getRowStats());
 
       Pattern errorPattern = Pattern.compile(
@@ -1676,7 +1676,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
     }
   }
 
-  private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws 
IOException
+  private IngestionStatsAndErrors getTaskReportData() throws IOException
   {
     Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
         reportsFile,
@@ -1684,7 +1684,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest 
extends InitializedNullHand
         {
         }
     );
-    return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
+    return IngestionStatsAndErrors.getPayloadFromTaskReports(
         taskReports
     );
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 9b68cebe9ee..5c42ea4da4d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -39,7 +39,7 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -248,18 +248,18 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
       Assert.assertEquals("Compaction state for " + segment.getId(), 
expectedState, segment.getLastCompactionState());
     }
 
-    List<IngestionStatsAndErrorsTaskReportData> reports = 
getIngestionReports();
+    List<IngestionStatsAndErrors> reports = getIngestionReports();
     Assert.assertEquals(reports.size(), 3); // since three index tasks are run 
by single compaction task
 
     // this test reads 3 segments and publishes 6 segments
     Assert.assertEquals(
         3,
-        
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
+        
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum()
     );
     Assert.assertEquals(
         6,
         reports.stream()
-               
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
+               .mapToLong(IngestionStatsAndErrors::getSegmentsPublished)
                .sum()
     );
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 03994e35e51..6f6b3e8ef48 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -557,7 +557,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
                 Granularities.MINUTE,
                 null
             ),
-            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, 
true),
+            IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
             false,
             false
         ),
@@ -1900,7 +1900,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
                 Granularities.MINUTE,
                 null
             ),
-            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, 
true),
+            IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
             appendToExisting,
             false
         ),
@@ -1940,7 +1940,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
                 Granularities.MINUTE,
                 null
             ),
-            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, 
true),
+            IndexTaskTest.createTuningConfig(2, 2, 2L, null, false, true),
             appendToExisting,
             false
         ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index b049b472d35..2619a23ba33 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -45,7 +45,7 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -77,7 +77,6 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
-import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -86,7 +85,6 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import 
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
@@ -106,17 +104,19 @@ import org.apache.druid.timeline.partition.PartitionIds;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
@@ -129,6 +129,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 @RunWith(Parameterized.class)
 public class IndexTaskTest extends IngestionTestBase
@@ -136,9 +137,6 @@ public class IndexTaskTest extends IngestionTestBase
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   private static final String DATASOURCE = "test";
   private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new 
TimestampSpec("ts", "auto", null);
   private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new 
DimensionsSpec(
@@ -173,19 +171,17 @@ public class IndexTaskTest extends IngestionTestBase
   private static final IndexSpec INDEX_SPEC = IndexSpec.DEFAULT;
   private final ObjectMapper jsonMapper;
   private final IndexIO indexIO;
-  private final RowIngestionMetersFactory rowIngestionMetersFactory;
   private final LockGranularity lockGranularity;
   private final boolean useInputFormatApi;
 
-  private AppenderatorsManager appenderatorsManager;
   private SegmentCacheManager segmentCacheManager;
   private TestTaskRunner taskRunner;
+  private File tmpDir;
 
   public IndexTaskTest(LockGranularity lockGranularity, boolean 
useInputFormatApi)
   {
     this.jsonMapper = getObjectMapper();
     this.indexIO = getIndexIO();
-    this.rowIngestionMetersFactory = getRowIngestionMetersFactory();
     this.lockGranularity = lockGranularity;
     this.useInputFormatApi = useInputFormatApi;
   }
@@ -193,9 +189,8 @@ public class IndexTaskTest extends IngestionTestBase
   @Before
   public void setup() throws IOException
   {
-    appenderatorsManager = new TestAppenderatorsManager();
-
     final File cacheDir = temporaryFolder.newFolder();
+    tmpDir = temporaryFolder.newFolder();
     segmentCacheManager = new SegmentLocalCacheManager(
         new SegmentLoaderConfig()
         {
@@ -213,12 +208,9 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testCorrectInputSourceResources() throws IOException
+  public void testCorrectInputSourceResources()
   {
-    File tmpDir = temporaryFolder.newFolder();
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         new IndexIngestionSpec(
             new DataSchema(
                 "test-json",
@@ -263,19 +255,13 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testIngestNullOnlyColumns() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,,\n");
       writer.write("2014-01-01T01:00:20Z,,\n");
       writer.write("2014-01-01T02:00:30Z,,\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         new IndexIngestionSpec(
             new DataSchema(
                 "test-json",
@@ -318,19 +304,13 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void 
testIngestNullOnlyColumns_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() 
throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,,\n");
       writer.write("2014-01-01T01:00:20Z,,\n");
       writer.write("2014-01-01T02:00:30Z,,\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         new IndexIngestionSpec(
             new DataSchema(
                 "test-json",
@@ -374,23 +354,14 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testDeterminePartitions() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            null,
             null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
@@ -429,11 +400,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testTransformSpec() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,an|array,1|2|3,1\n");
       writer.write("2014-01-01T01:00:20Z,b,another|array,3|4,1\n");
       writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n");
@@ -472,8 +439,6 @@ public class IndexTaskTest extends IngestionTestBase
     final IndexIngestionSpec indexIngestionSpec;
     if (useInputFormatApi) {
       indexIngestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           DEFAULT_TIMESTAMP_SPEC,
           dimensionsSpec,
           new CsvInputFormat(columns, listDelimiter, null, false, 0),
@@ -496,12 +461,7 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        indexIngestionSpec,
-        null
-    );
+    IndexTask indexTask = createIndexTask(indexIngestionSpec, null);
 
     Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
 
@@ -568,27 +528,18 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testWithArbitraryGranularity() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new ArbitraryGranularitySpec(
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -597,34 +548,29 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     final List<DataSegment> segments = runSuccessfulTask(indexTask);
-
     Assert.assertEquals(1, segments.size());
+
+    invokeApi(req -> indexTask.getLiveReports(req, null));
+    invokeApi(req -> indexTask.getLiveReports(req, "full"));
+    invokeApi(req -> indexTask.getRowStats(req, null));
+    invokeApi(req -> indexTask.getRowStats(req, "full"));
   }
 
   @Test
   public void testIntervalBucketing() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T07:59:59.977Z,a,1\n");
       writer.write("2014-01-01T08:00:00.000Z,b,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.HOUR,
                 
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(50, true),
             false,
             false
@@ -640,24 +586,16 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testNumShardsProvided() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             null,
-            null,
-            createTuningConfigWithPartitionsSpec(new 
HashedPartitionsSpec(null, 1, null), true),
+            createTuningConfigWithPartitionsSpec(new 
HashedPartitionsSpec(null, 1, null)),
             false,
             false
         ),
@@ -681,25 +619,17 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testNumShardsAndHashPartitionFunctionProvided() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            null,
             null,
             createTuningConfigWithPartitionsSpec(
-                new HashedPartitionsSpec(null, 1, null, 
HashPartitionFunction.MURMUR3_32_ABS), true
+                new HashedPartitionsSpec(null, 1, null, 
HashPartitionFunction.MURMUR3_32_ABS)
             ),
             false,
             false
@@ -724,24 +654,16 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testNumShardsAndPartitionDimensionsProvided() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    final IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    final IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            null,
             null,
-            createTuningConfigWithPartitionsSpec(new 
HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true),
+            createTuningConfigWithPartitionsSpec(new 
HashedPartitionsSpec(null, 2, ImmutableList.of("dim"))),
             false,
             false
         ),
@@ -797,22 +719,14 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void 
testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSuccessfullyAppend()
 throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            null,
             null,
             createTuningConfigWithMaxRowsPerSegment(2, false),
             true,
@@ -842,27 +756,19 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testIntervalNotSpecified() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -893,49 +799,39 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testIntervalNotSpecifiedWithReplace() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
     // Expect exception if reingest with dropExisting and null intervals is 
attempted
-    expectedException.expect(IAE.class);
-    expectedException.expectMessage(
-        "GranularitySpec's intervals cannot be empty for replace."
-    );
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            new UniformGranularitySpec(
-                Granularities.HOUR,
-                Granularities.MINUTE,
-                null
+    Exception exception = Assert.assertThrows(
+        IAE.class,
+        () -> createIndexTask(
+            createDefaultIngestionSpec(
+                new UniformGranularitySpec(
+                    Granularities.HOUR,
+                    Granularities.MINUTE,
+                    null
+                ),
+                createTuningConfigWithMaxRowsPerSegment(2, true),
+                false,
+                true
             ),
-            null,
-            createTuningConfigWithMaxRowsPerSegment(2, true),
-            false,
-            true
-        ),
-        null
+            null
+        )
+    );
+    Assert.assertEquals(
+        "GranularitySpec's intervals cannot be empty for replace.",
+        exception.getMessage()
     );
-
   }
 
   @Test
   public void testCSVFileWithHeader() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("time,d,val\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
@@ -956,8 +852,6 @@ public class IndexTaskTest extends IngestionTestBase
       );
     } else {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(null, null, null, true, 0),
@@ -969,9 +863,7 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -988,11 +880,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testCSVFileWithHeaderColumnOverride() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("time,d,val\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
@@ -1003,8 +891,6 @@ public class IndexTaskTest extends IngestionTestBase
     final IndexIngestionSpec ingestionSpec;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -1027,9 +913,7 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -1046,10 +930,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testWithSmallMaxTotalRows() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T00:00:10Z,b,2\n");
       writer.write("2014-01-01T00:00:10Z,c,3\n");
@@ -1061,19 +942,14 @@ public class IndexTaskTest extends IngestionTestBase
       writer.write("2014-01-01T02:00:30Z,c,3\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
-            createTuningConfig(2, 2, null, 2L, null, false, true),
+            createTuningConfig(2, 2, 2L, null, false, true),
             false,
             false
         ),
@@ -1099,25 +975,17 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testPerfectRollup() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    populateRollupTestData(tmpFile);
+    populateRollupTestData(createTempFile());
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.DAY,
                 true,
                 null
             ),
-            null,
-            createTuningConfig(3, 2, null, 2L, null, true, true),
+            createTuningConfig(3, 2, 2L, null, true, true),
             false,
             false
         ),
@@ -1134,7 +1002,7 @@ public class IndexTaskTest extends IngestionTestBase
 
       Assert.assertEquals(DATASOURCE, segment.getDataSource());
       Assert.assertEquals(expectedInterval, segment.getInterval());
-      
Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
+      Assert.assertEquals(segment.getShardSpec().getClass(), 
HashBasedNumberedShardSpec.class);
       Assert.assertEquals(i, segment.getShardSpec().getPartitionNum());
     }
   }
@@ -1142,25 +1010,17 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testBestEffortRollup() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    populateRollupTestData(tmpFile);
+    populateRollupTestData(createTempFile());
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.DAY,
                 true,
                 null
             ),
-            null,
-            createTuningConfig(3, 2, null, 2L, null, false, true),
+            createTuningConfig(3, 2, 2L, null, false, true),
             false,
             false
         ),
@@ -1183,24 +1043,17 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testWaitForSegmentAvailabilityNoSegments() throws IOException
+  public void testWaitForSegmentAvailabilityNoSegments()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
     TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
     List<DataSegment> segmentsToWaitFor = new ArrayList<>();
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -1214,25 +1067,18 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testWaitForSegmentAvailabilityInvalidWaitTimeout() throws 
IOException
+  public void testWaitForSegmentAvailabilityInvalidWaitTimeout()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
     TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
     List<DataSegment> segmentsToWaitFor = new ArrayList<>();
     segmentsToWaitFor.add(EasyMock.createMock(DataSegment.class));
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -1246,10 +1092,8 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws 
IOException
+  public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
     TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
     SegmentHandoffNotifierFactory mockFactory = 
EasyMock.createMock(SegmentHandoffNotifierFactory.class);
     SegmentHandoffNotifier mockNotifier = 
EasyMock.createMock(SegmentHandoffNotifier.class);
@@ -1260,18 +1104,13 @@ public class IndexTaskTest extends IngestionTestBase
     segmentsToWaitFor.add(mockDataSegment1);
     segmentsToWaitFor.add(mockDataSegment2);
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -1309,10 +1148,8 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws 
IOException
+  public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
     TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
 
     DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
@@ -1321,18 +1158,13 @@ public class IndexTaskTest extends IngestionTestBase
     segmentsToWaitFor.add(mockDataSegment1);
     segmentsToWaitFor.add(mockDataSegment2);
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -1365,10 +1197,8 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws 
IOException
+  public void testWaitForSegmentAvailabilityEmitsExpectedMetric()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
     TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
 
     DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
@@ -1377,18 +1207,13 @@ public class IndexTaskTest extends IngestionTestBase
     segmentsToWaitFor.add(mockDataSegment1);
     segmentsToWaitFor.add(mockDataSegment2);
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 null
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(2, true),
             false,
             false
@@ -1438,14 +1263,15 @@ public class IndexTaskTest extends IngestionTestBase
     }
   }
 
+  private File createTempFile() throws IOException
+  {
+    return File.createTempFile("druid", "index", tmpDir);
+  }
+
   @Test
   public void testIgnoreParseException() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("time,d,val\n");
       writer.write("unparseable,a,1\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
@@ -1454,15 +1280,13 @@ public class IndexTaskTest extends IngestionTestBase
     final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", 
null);
     final List<String> columns = Arrays.asList("time", "dim", "val");
     // ignore parse exception
-    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, null, false, false);
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, false, false);
 
     // GranularitySpec.intervals and numShards must be null to verify 
reportParseException=false is respected both in
     // IndexTask.determineShardSpecs() and 
IndexTask.generateAndPublishSegments()
     final IndexIngestionSpec parseExceptionIgnoreSpec;
     if (useInputFormatApi) {
       parseExceptionIgnoreSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -1485,12 +1309,7 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        parseExceptionIgnoreSpec,
-        null
-    );
+    IndexTask indexTask = createIndexTask(parseExceptionIgnoreSpec, null);
 
     final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
@@ -1502,10 +1321,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testReportParseException() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+    final File tmpFile = createTempFile();
     try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
       writer.write("time,d,val\n");
       writer.write("unparseable,a,1\n");
@@ -1515,13 +1331,11 @@ public class IndexTaskTest extends IngestionTestBase
     final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", 
null);
     final List<String> columns = Arrays.asList("time", "dim", "val");
     // report parse exception
-    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, null, false, true);
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, false, true);
     final IndexIngestionSpec indexIngestionSpec;
     List<String> expectedMessages;
     if (useInputFormatApi) {
       indexIngestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -1550,18 +1364,13 @@ public class IndexTaskTest extends IngestionTestBase
             tmpFile.toURI()
         )
     );
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        indexIngestionSpec,
-        null
-    );
+    IndexTask indexTask = createIndexTask(indexIngestionSpec, null);
 
     TaskStatus status = runTask(indexTask).lhs;
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
     checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     ParseExceptionReport parseExceptionReport =
         ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
@@ -1574,10 +1383,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testMultipleParseExceptionsSuccess() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+    final File tmpFile = createTempFile();
     try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
       
writer.write("{\"time\":\"unparseable\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n");
 // unparseable time
       
writer.write("{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n");
 // valid row
@@ -1633,8 +1439,6 @@ public class IndexTaskTest extends IngestionTestBase
     final IndexIngestionSpec ingestionSpec;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           dimensionsSpec,
           new JsonInputFormat(null, null, null, null, null),
@@ -1657,18 +1461,13 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
-        ingestionSpec,
-        null
-    );
+    IndexTask indexTask = createIndexTask(ingestionSpec, null);
 
     TaskStatus status = runTask(indexTask).lhs;
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
     Assert.assertNull(status.getErrorMsg());
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -1759,10 +1558,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testMultipleParseExceptionsFailure() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+    final File tmpFile = createTempFile();
     try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
       writer.write("time,dim,dimLong,dimFloat,val\n");
       writer.write("unparseable,a,2,3.0,1\n"); // unparseable
@@ -1815,8 +1611,6 @@ public class IndexTaskTest extends IngestionTestBase
     List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           dimensionsSpec,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -1853,9 +1647,7 @@ public class IndexTaskTest extends IngestionTestBase
             tmpFile.toURI()
         )
     );
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -1864,7 +1656,7 @@ public class IndexTaskTest extends IngestionTestBase
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
     checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -1902,10 +1694,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws 
Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+    final File tmpFile = createTempFile();
     try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
       writer.write("time,dim,dimLong,dimFloat,val\n");
       writer.write("unparseable,a,2,3.0,1\n"); // unparseable
@@ -1958,8 +1747,6 @@ public class IndexTaskTest extends IngestionTestBase
     List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           timestampSpec,
           dimensionsSpec,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -1987,9 +1774,7 @@ public class IndexTaskTest extends IngestionTestBase
         StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0, 
dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)", 
tmpFile.toURI()),
         StringUtils.format("Timestamp[unparseable] is unparseable! Event: 
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, 
Line: 2)", tmpFile.toURI())
     );
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -1998,7 +1783,7 @@ public class IndexTaskTest extends IngestionTestBase
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
     checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     Map<String, Object> expectedMetrics = ImmutableMap.of(
         RowIngestionMeters.DETERMINE_PARTITIONS,
@@ -2036,36 +1821,26 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testCsvWithHeaderOfEmptyColumns() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("ts,,\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
-    tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("ts,dim,\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
-    tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("ts,,val\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
     }
 
     // report parse exception
-    final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, 
null, null, true, true);
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, 
null, true, true);
     final IndexIngestionSpec ingestionSpec;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           DEFAULT_TIMESTAMP_SPEC,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(null, null, null, true, 0),
@@ -2088,9 +1863,7 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -2123,10 +1896,7 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testCsvWithHeaderOfEmptyTimestamp() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
+    final File tmpFile = createTempFile();
     try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
       writer.write(",,\n");
       writer.write("2014-01-01T00:00:10Z,a,1\n");
@@ -2134,13 +1904,11 @@ public class IndexTaskTest extends IngestionTestBase
 
     final List<String> columns = Arrays.asList("ts", "", "");
     // report parse exception
-    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, null, false, true);
+    final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, 
null, false, true);
     final IndexIngestionSpec ingestionSpec;
     List<String> expectedMessages;
     if (useInputFormatApi) {
       ingestionSpec = createIngestionSpec(
-          jsonMapper,
-          tmpDir,
           DEFAULT_TIMESTAMP_SPEC,
           DimensionsSpec.EMPTY,
           new CsvInputFormat(columns, null, null, true, 0),
@@ -2169,9 +1937,7 @@ public class IndexTaskTest extends IngestionTestBase
             tmpFile.toURI()
         )
     );
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         ingestionSpec,
         null
     );
@@ -2181,7 +1947,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
 
-    IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
+    IngestionStatsAndErrors reportData = getTaskReportData();
 
     ParseExceptionReport parseExceptionReport =
         ParseExceptionReport.forPhase(reportData, 
RowIngestionMeters.BUILD_SEGMENTS);
@@ -2196,26 +1962,18 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testOverwriteWithSameSegmentGranularity() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    populateRollupTestData(tmpFile);
+    populateRollupTestData(createTempFile());
 
     for (int i = 0; i < 2; i++) {
-      final IndexTask indexTask = new IndexTask(
-          null,
-          null,
+      final IndexTask indexTask = createIndexTask(
           createDefaultIngestionSpec(
-              jsonMapper,
-              tmpDir,
               new UniformGranularitySpec(
                   Granularities.DAY,
                   Granularities.DAY,
                   true,
                   null
               ),
-              null,
-              createTuningConfig(3, 2, null, 2L, null, false, true),
+              createTuningConfig(3, 2, 2L, null, false, true),
               false,
               false
           ),
@@ -2260,27 +2018,19 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void testOverwriteWithDifferentSegmentGranularity() throws Exception
   {
-    final File tmpDir = temporaryFolder.newFolder();
-    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    populateRollupTestData(tmpFile);
+    populateRollupTestData(createTempFile());
 
     for (int i = 0; i < 2; i++) {
       final Granularity segmentGranularity = i == 0 ? Granularities.DAY : 
Granularities.MONTH;
-      final IndexTask indexTask = new IndexTask(
-          null,
-          null,
+      final IndexTask indexTask = createIndexTask(
           createDefaultIngestionSpec(
-              jsonMapper,
-              tmpDir,
               new UniformGranularitySpec(
                   segmentGranularity,
                   Granularities.DAY,
                   true,
                   null
               ),
-              null,
-              createTuningConfig(3, 2, null, 2L, null, false, true),
+              createTuningConfig(3, 2, 2L, null, false, true),
               false,
               false
           ),
@@ -2305,55 +2055,43 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() 
throws Exception
+  public void testIndexTaskWithSingleDimPartitionsSpecThrowingException()
   {
-    final File tmpDir = temporaryFolder.newFolder();
-    final IndexTask task = new IndexTask(
-        null,
-        null,
+    final IndexTask task = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
-            null,
             null,
-            createTuningConfigWithPartitionsSpec(new 
SingleDimensionPartitionsSpec(null, 1, null, false), true),
+            createTuningConfigWithPartitionsSpec(new 
SingleDimensionPartitionsSpec(null, 1, null, false)),
             false,
             false
         ),
         null
     );
-    expectedException.expect(UnsupportedOperationException.class);
-    expectedException.expectMessage(
-        
"partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec]
 is not supported"
+    Exception exception = Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> task.isReady(createActionClient(task))
+    );
+    Assert.assertEquals(
+        
"partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec]
 is not supported",
+        exception.getMessage()
     );
-    task.isReady(createActionClient(task));
   }
 
   @Test
   public void testOldSegmentNotReplacedWhenDropFlagFalse() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.YEAR,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2365,24 +2103,19 @@ public class IndexTaskTest extends IngestionTestBase
     List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
-    Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
     Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
     for (DataSegment segment : usedSegmentsBeforeOverwrite) {
       Assert.assertTrue(Granularities.YEAR.isAligned(segment.getInterval()));
     }
 
-    indexTask = new IndexTask(
-        null,
-        null,
+    indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.MINUTE,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2394,7 +2127,7 @@ public class IndexTaskTest extends IngestionTestBase
     segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(3, segments.size());
-    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
     Assert.assertEquals(4, usedSegmentsBeforeAfterOverwrite.size());
     int yearSegmentFound = 0;
     int minuteSegmentFound = 0;
@@ -2414,30 +2147,22 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void 
testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment()
 throws Exception
+  public void 
testOldSegmentNotCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalDoesNotContainsOldSegment()
+      throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T01:00:10Z,a,1\n");
       writer.write("2014-01-01T01:10:20Z,b,1\n");
       writer.write("2014-01-01T01:20:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2449,24 +2174,19 @@ public class IndexTaskTest extends IngestionTestBase
     List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
-    Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
     Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
     for (DataSegment segment : usedSegmentsBeforeOverwrite) {
       Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
     }
 
-    indexTask = new IndexTask(
-        null,
-        null,
+    indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01T01:10:00Z/2014-01-01T02:00:00Z"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             true
@@ -2478,7 +2198,7 @@ public class IndexTaskTest extends IngestionTestBase
     segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
-    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
     Assert.assertEquals(2, usedSegmentsBeforeAfterOverwrite.size());
     int segmentFound = 0;
     int tombstonesFound = 0;
@@ -2506,30 +2226,22 @@ public class IndexTaskTest extends IngestionTestBase
   }
 
   @Test
-  public void 
testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment()
 throws Exception
+  public void 
testOldSegmentCoveredByTombstonesWhenDropFlagTrueSinceIngestionIntervalContainsOldSegment()
+      throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-01-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01T01:00:00Z/2014-01-01T02:00:00Z"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2541,24 +2253,19 @@ public class IndexTaskTest extends IngestionTestBase
     List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
-    Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
     Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
     for (DataSegment segment : usedSegmentsBeforeOverwrite) {
       Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
     }
 
-    indexTask = new IndexTask(
-        null,
-        null,
+    indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.HOUR,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             true
@@ -2570,7 +2277,7 @@ public class IndexTaskTest extends IngestionTestBase
     segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(24, segments.size());
-    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeAfterOverwrite = getAllUsedSegments();
     Assert.assertEquals(24, usedSegmentsBeforeAfterOverwrite.size());
     for (DataSegment segment : usedSegmentsBeforeAfterOverwrite) {
       // Used segments after overwrite and drop will contain only the
@@ -2586,28 +2293,19 @@ public class IndexTaskTest extends IngestionTestBase
   @Test
   public void verifyPublishingOnlyTombstones() throws Exception
   {
-    File tmpDir = temporaryFolder.newFolder();
-
-    File tmpFile = File.createTempFile("druid", "index", tmpDir);
-
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-03-01T00:00:10Z,a,1\n");
       writer.write("2014-03-01T01:00:20Z,b,1\n");
       writer.write("2014-03-01T02:00:30Z,c,1\n");
     }
 
-    IndexTask indexTask = new IndexTask(
-        null,
-        null,
+    IndexTask indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-03/2014-04-01"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2619,7 +2317,7 @@ public class IndexTaskTest extends IngestionTestBase
     List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
-    Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
+    Set<DataSegment> usedSegmentsBeforeOverwrite = getAllUsedSegments();
     Assert.assertEquals(1, usedSegmentsBeforeOverwrite.size());
     for (DataSegment segment : usedSegmentsBeforeOverwrite) {
       Assert.assertTrue(Granularities.DAY.isAligned(segment.getInterval()));
@@ -2628,25 +2326,19 @@ public class IndexTaskTest extends IngestionTestBase
     // create new data but with an ingestion interval appropriate to filter it 
all out so that only tombstones
     // are created:
     tmpDir = temporaryFolder.newFolder();
-    tmpFile = File.createTempFile("druid", "index", tmpDir);
-    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+    try (BufferedWriter writer = Files.newWriter(createTempFile(), 
StandardCharsets.UTF_8)) {
       writer.write("2014-01-01T00:00:10Z,a,1\n");
       writer.write("2014-01-01T01:00:20Z,b,1\n");
       writer.write("2014-12-01T02:00:30Z,c,1\n");
     }
 
-    indexTask = new IndexTask(
-        null,
-        null,
+    indexTask = createIndexTask(
         createDefaultIngestionSpec(
-            jsonMapper,
-            tmpDir,
             new UniformGranularitySpec(
                 Granularities.DAY,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-03-01/2014-04-01")) // filter out 
all data
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             true
@@ -2661,37 +2353,35 @@ public class IndexTaskTest extends IngestionTestBase
     Assert.assertTrue(segments.get(0).isTombstone());
   }
 
- 
+
   @Test
-  public void testErrorWhenDropFlagTrueAndOverwriteFalse() throws Exception
+  public void testErrorWhenDropFlagTrueAndOverwriteFalse()
   {
-    expectedException.expect(IAE.class);
-    expectedException.expectMessage(
-        "Cannot simultaneously replace and append to existing segments. Either 
dropExisting or appendToExisting should be set to false"
-    );
-    new IndexTask(
-        null,
-        null,
-        createDefaultIngestionSpec(
-            jsonMapper,
-            temporaryFolder.newFolder(),
-            new UniformGranularitySpec(
-                Granularities.MINUTE,
-                Granularities.MINUTE,
-                
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+    Exception exception = Assert.assertThrows(
+        IAE.class,
+        () -> createIndexTask(
+            createDefaultIngestionSpec(
+                new UniformGranularitySpec(
+                    Granularities.MINUTE,
+                    Granularities.MINUTE,
+                    
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
+                ),
+                createTuningConfigWithMaxRowsPerSegment(10, true),
+                true,
+                true
             ),
-            null,
-            createTuningConfigWithMaxRowsPerSegment(10, true),
-            true,
-            true
-        ),
-        null
+            null
+        )
+    );
+    Assert.assertEquals(
+        "Cannot simultaneously replace and append to existing segments."
+        + " Either dropExisting or appendToExisting should be set to false",
+        exception.getMessage()
     );
   }
 
-  // If isStandaloneTask is false, cleanup should be a no-op
   @Test
-  public void testCleanupIndexTask() throws Exception
+  public void testCleanupIsNoopIfNotStandaloneTask() throws Exception
   {
     new IndexTask(
         null,
@@ -2700,14 +2390,11 @@ public class IndexTaskTest extends IngestionTestBase
         "dataSource",
         null,
         createDefaultIngestionSpec(
-            jsonMapper,
-            temporaryFolder.newFolder(),
             new UniformGranularitySpec(
                 Granularities.MINUTE,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2718,11 +2405,8 @@ public class IndexTaskTest extends IngestionTestBase
     ).cleanUp(null, null);
   }
 
-  /* if shouldCleanup is true, we should fall back to AbstractTask.cleanup,
-   * check isEncapsulatedTask=false, and then exit.
-   */
   @Test
-  public void testCleanup() throws Exception
+  public void testCleanupIsDoneIfStandaloneTask() throws Exception
   {
     TaskToolbox toolbox = EasyMock.createMock(TaskToolbox.class);
     TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
@@ -2736,14 +2420,11 @@ public class IndexTaskTest extends IngestionTestBase
         "dataSource",
         null,
         createDefaultIngestionSpec(
-            jsonMapper,
-            temporaryFolder.newFolder(),
             new UniformGranularitySpec(
                 Granularities.MINUTE,
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            null,
             createTuningConfigWithMaxRowsPerSegment(10, true),
             false,
             false
@@ -2758,7 +2439,7 @@ public class IndexTaskTest extends IngestionTestBase
   public static void 
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status)
   {
     // full stacktrace will be too long and make tests brittle (e.g. if line # 
changes), just match the main message
-    Assert.assertThat(
+    MatcherAssert.assertThat(
         status.getErrorMsg(),
         CoreMatchers.containsString("Max parse exceptions")
     );
@@ -2789,24 +2470,21 @@ public class IndexTaskTest extends IngestionTestBase
         1,
         null,
         null,
-        null,
         forceGuaranteedRollup,
         true
     );
   }
 
   private static IndexTuningConfig createTuningConfigWithPartitionsSpec(
-      PartitionsSpec partitionsSpec,
-      boolean forceGuaranteedRollup
+      PartitionsSpec partitionsSpec
   )
   {
     return createTuningConfig(
         null,
         1,
         null,
-        null,
         partitionsSpec,
-        forceGuaranteedRollup,
+        true,
         true
     );
   }
@@ -2814,7 +2492,6 @@ public class IndexTaskTest extends IngestionTestBase
   static IndexTuningConfig createTuningConfig(
       @Nullable Integer maxRowsPerSegment,
       @Nullable Integer maxRowsInMemory,
-      @Nullable Long maxBytesInMemory,
       @Nullable Long maxTotalRows,
       @Nullable PartitionsSpec partitionsSpec,
       boolean forceGuaranteedRollup,
@@ -2826,7 +2503,7 @@ public class IndexTaskTest extends IngestionTestBase
         maxRowsPerSegment,
         null,
         maxRowsInMemory,
-        maxBytesInMemory,
+        null,
         null,
         maxTotalRows,
         null,
@@ -2850,7 +2527,26 @@ public class IndexTaskTest extends IngestionTestBase
     );
   }
 
-  private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws 
IOException
+  @SuppressWarnings("unchecked")
+  private <T> T invokeApi(Function<HttpServletRequest, Response> api)
+  {
+    final HttpServletRequest request = EasyMock.mock(HttpServletRequest.class);
+    EasyMock.expect(request.getAttribute(EasyMock.anyString()))
+            .andReturn("allow-all");
+    EasyMock.replay(request);
+    return (T) api.apply(request).getEntity();
+  }
+
+  private Set<DataSegment> getAllUsedSegments()
+  {
+    return Sets.newHashSet(
+        getSegmentsMetadataManager()
+            
.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE, 
Intervals.ETERNITY, true)
+            .get()
+    );
+  }
+
+  private IngestionStatsAndErrors getTaskReportData() throws IOException
   {
     Map<String, TaskReport> taskReports = jsonMapper.readValue(
         taskRunner.getTaskReportsFile(),
@@ -2858,16 +2554,19 @@ public class IndexTaskTest extends IngestionTestBase
         {
         }
     );
-    return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
-        taskReports
-    );
+    return IngestionStatsAndErrors.getPayloadFromTaskReports(taskReports);
+  }
+
+  private IndexTask createIndexTask(
+      IndexIngestionSpec ingestionSchema,
+      Map<String, Object> context
+  )
+  {
+    return new IndexTask(null, null, ingestionSchema, context);
   }
 
   private IndexIngestionSpec createDefaultIngestionSpec(
-      ObjectMapper objectMapper,
-      File baseDir,
       @Nullable GranularitySpec granularitySpec,
-      @Nullable TransformSpec transformSpec,
       IndexTuningConfig tuningConfig,
       boolean appendToExisting,
       Boolean dropExisting
@@ -2875,12 +2574,10 @@ public class IndexTaskTest extends IngestionTestBase
   {
     if (useInputFormatApi) {
       return createIngestionSpec(
-          objectMapper,
-          baseDir,
           DEFAULT_TIMESTAMP_SPEC,
           DEFAULT_DIMENSIONS_SPEC,
           DEFAULT_INPUT_FORMAT,
-          transformSpec,
+          null,
           granularitySpec,
           tuningConfig,
           appendToExisting,
@@ -2888,10 +2585,10 @@ public class IndexTaskTest extends IngestionTestBase
       );
     } else {
       return createIngestionSpec(
-          objectMapper,
-          baseDir,
+          jsonMapper,
+          tmpDir,
           DEFAULT_PARSE_SPEC,
-          transformSpec,
+          null,
           granularitySpec,
           tuningConfig,
           appendToExisting,
@@ -2926,9 +2623,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
   }
 
-  static IndexIngestionSpec createIngestionSpec(
-      ObjectMapper objectMapper,
-      File baseDir,
+  private IndexIngestionSpec createIngestionSpec(
       TimestampSpec timestampSpec,
       DimensionsSpec dimensionsSpec,
       InputFormat inputFormat,
@@ -2940,8 +2635,8 @@ public class IndexTaskTest extends IngestionTestBase
   )
   {
     return createIngestionSpec(
-        objectMapper,
-        baseDir,
+        jsonMapper,
+        tmpDir,
         null,
         timestampSpec,
         dimensionsSpec,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
index bc7148c5401..0c95773da92 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ParseExceptionReport.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.indexing.common.task;
 
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -41,7 +41,7 @@ public class ParseExceptionReport
 
   @SuppressWarnings("unchecked")
   public static ParseExceptionReport forPhase(
-      IngestionStatsAndErrorsTaskReportData reportData,
+      IngestionStatsAndErrors reportData,
       String phase
   )
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index 53b8ace317a..aece6edb3f4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -26,8 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 import org.apache.druid.indexer.IngestionState;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TestUtils;
@@ -59,7 +59,7 @@ public class TaskReportSerdeTest
   {
     IngestionStatsAndErrorsTaskReport report1 = new 
IngestionStatsAndErrorsTaskReport(
         "testID",
-        new IngestionStatsAndErrorsTaskReportData(
+        new IngestionStatsAndErrors(
             IngestionState.BUILD_SEGMENTS,
             ImmutableMap.of(
                 "hello", "world"
@@ -118,7 +118,7 @@ public class TaskReportSerdeTest
 
     IngestionStatsAndErrorsTaskReport expected = new 
IngestionStatsAndErrorsTaskReport(
         IngestionStatsAndErrorsTaskReport.REPORT_KEY,
-        new IngestionStatsAndErrorsTaskReportData(
+        new IngestionStatsAndErrors(
             IngestionState.COMPLETED,
             ImmutableMap.of(
                 "hello", "world"
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 6b662e473b0..08b0c584f76 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -50,8 +50,8 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -558,7 +558,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       if (!task.isPresent()) {
         return null;
       }
-      return Futures.immediateFuture(((ParallelIndexSupervisorTask) 
task.get()).doGetLiveReports("full"));
+      return Futures.immediateFuture(((ParallelIndexSupervisorTask) 
task.get()).doGetLiveReports(true));
     }
 
     public TaskContainer getTaskContainer(String taskId)
@@ -1074,12 +1074,12 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
     });
   }
 
-  public List<IngestionStatsAndErrorsTaskReportData> getIngestionReports() 
throws IOException
+  public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
   {
     return getReports().entrySet()
                        .stream()
                        .filter(entry -> 
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
-                       .map(entry -> (IngestionStatsAndErrorsTaskReportData) 
entry.getValue().getPayload())
+                       .map(entry -> (IngestionStatsAndErrors) 
entry.getValue().getPayload())
                        .collect(Collectors.toList());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 0da7ed0934e..7ad9f3c9464 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -446,7 +446,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         false,
         Collections.emptyList()
     );
-    Map<String, Object> actualReports = task.doGetLiveReports("full");
+    Map<String, Object> actualReports = task.doGetLiveReports(true);
     final long processedBytes = useInputFormatApi ? 335 : 0;
     Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
         task.getId(),
@@ -497,7 +497,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
 
     TaskContainer taskContainer = 
getIndexingServiceClient().getTaskContainer(task.getId());
     final ParallelIndexSupervisorTask executedTask = 
(ParallelIndexSupervisorTask) taskContainer.getTask();
-    Map<String, Object> actualReports = executedTask.doGetLiveReports("full");
+    Map<String, Object> actualReports = executedTask.doGetLiveReports(true);
 
     final long processedBytes = useInputFormatApi ? 335 : 0;
     RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 
processedBytes, 1, 1, 1);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 2db5da143ed..fb454acafec 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -52,7 +52,7 @@ import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
@@ -459,7 +459,7 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
     return new SegmentDescriptorAndExpectedDim1Values(interval, partitionNum, 
expectedDim1Values);
   }
 
-  protected IngestionStatsAndErrorsTaskReportData getTaskReportData() throws 
IOException
+  protected IngestionStatsAndErrors getTaskReportData() throws IOException
   {
     Map<String, TaskReport> taskReports = OBJECT_MAPPER.readValue(
         reportsFile,
@@ -467,7 +467,7 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
         {
         }
     );
-    return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
+    return IngestionStatsAndErrors.getPayloadFromTaskReports(
         taskReports
     );
   }
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index fb02d8f8dad..8eadbdf8f11 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskReport;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -487,7 +487,7 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
     if (segmentAvailabilityConfirmationPair.lhs != null && 
segmentAvailabilityConfirmationPair.lhs) {
       TaskReport reportRaw = 
indexer.getTaskReport(taskID).get("ingestionStatsAndErrors");
       IngestionStatsAndErrorsTaskReport report = 
(IngestionStatsAndErrorsTaskReport) reportRaw;
-      IngestionStatsAndErrorsTaskReportData reportData = 
(IngestionStatsAndErrorsTaskReportData) report.getPayload();
+      IngestionStatsAndErrors reportData = (IngestionStatsAndErrors) 
report.getPayload();
 
       // Confirm that the task waited longer than 0ms for the task to complete.
       Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0);
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index ac4ef536b02..ceaeddea0dc 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -27,7 +27,6 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.ISE;
@@ -263,13 +262,15 @@ public class OverlordResourceTestClient
 
   public String getTaskErrorMessage(String taskId)
   {
-    return ((IngestionStatsAndErrorsTaskReportData) 
getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getErrorMsg();
+    return 
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+                                .getPayload().getErrorMsg();
   }
 
   public RowIngestionMetersTotals getTaskStats(String taskId)
   {
     try {
-      Object buildSegment = ((IngestionStatsAndErrorsTaskReportData) 
getTaskReport(taskId).get("ingestionStatsAndErrors").getPayload()).getRowStats().get("buildSegments");
+      Object buildSegment = 
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+                                                 
.getPayload().getRowStats().get("buildSegments");
       return jsonMapper.convertValue(buildSegment, 
RowIngestionMetersTotals.class);
     }
     catch (Exception e) {
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 0f160580db5..d419bbb48d3 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -23,8 +23,8 @@ import com.google.common.collect.FluentIterable;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskReport;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -370,7 +370,7 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
     if (segmentAvailabilityConfirmationPair.lhs != null && 
segmentAvailabilityConfirmationPair.lhs) {
       TaskReport reportRaw = 
indexer.getTaskReport(taskID).get("ingestionStatsAndErrors");
       IngestionStatsAndErrorsTaskReport report = 
(IngestionStatsAndErrorsTaskReport) reportRaw;
-      IngestionStatsAndErrorsTaskReportData reportData = 
(IngestionStatsAndErrorsTaskReportData) report.getPayload();
+      IngestionStatsAndErrors reportData = (IngestionStatsAndErrors) 
report.getPayload();
 
       // Confirm that the task waited longer than 0ms for the task to complete.
       Assert.assertTrue(reportData.getSegmentAvailabilityWaitTimeMs() > 0);
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 296f7fe747a..2b37c7a27d2 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -21,8 +21,8 @@ package org.apache.druid.tests.indexer;
 
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -176,13 +176,13 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
       Assert.assertEquals(2,
                           reports.values()
                                  .stream()
-                                 .mapToLong(r -> 
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished())
+                                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsPublished())
                                  .sum()
       );
       Assert.assertEquals(4,
                           reports.values()
                                  .stream()
-                                 .mapToLong(r -> 
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead())
+                                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsRead())
                                  .sum()
       );
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to