This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ec52f686c0c Fix compaction tasks reports getting overwritten (#15981)
ec52f686c0c is described below

commit ec52f686c0c36da4b70ebe9495522dac9ea47b8d
Author: Adithya Chakilam <[email protected]>
AuthorDate: Mon Mar 4 09:10:17 2024 -0600

    Fix compaction tasks reports getting overwritten (#15981)
    
    * Fix compaction tasks reports geting overwrittened
    
    * only skip for compactiont task
    
    * address comments
    
    * fix boolean
    
    * move boolean flag to task rather than spec
    
    * rename variable
    
    * add docs, fix missing case
    
    * Update docs/ingestion/tasks.md
    
    * rename var
    
    * add task report decode check in IT
    
    * change assert
---
 docs/ingestion/tasks.md                            | 52 ++++++++++++++++++++++
 .../druid/indexing/common/task/CompactionTask.java | 17 ++++++-
 .../druid/indexing/common/task/IndexTask.java      | 35 ++++++++++++---
 .../parallel/ParallelIndexSupervisorTask.java      | 51 ++++++++++++++-------
 .../common/task/CompactionTaskParallelRunTest.java |  4 ++
 .../AbstractParallelIndexSupervisorTaskTest.java   | 24 ++++++++++
 .../batch/parallel/HashPartitionTaskKillTest.java  |  2 +-
 .../batch/parallel/RangePartitionTaskKillTest.java |  2 +-
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 15 ++++++-
 9 files changed, 175 insertions(+), 27 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index af9d8b7f88b..fbf1f4a38e7 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -93,6 +93,58 @@ An example output is shown below:
 }
 ```
 
+Compaction tasks can generate multiple sets of segment output reports based on 
how the input interval is split. So the overall report contains mappings from 
each split to each report.
+Example report could be:
+
+```json
+{
+  "ingestionStatsAndErrors_0": {
+    "taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
+    "payload": {
+      "ingestionState": "COMPLETED",
+      "unparseableEvents": {},
+      "rowStats": {
+        "buildSegments": {
+          "processed": 5390324,
+          "processedBytes": 5109573212,
+          "processedWithError": 0,
+          "thrownAway": 0,
+          "unparseable": 0
+        }
+      },
+      "segmentAvailabilityConfirmed": false,
+      "segmentAvailabilityWaitTimeMs": 0,
+      "recordsProcessed": null,
+      "errorMsg": null
+    },
+    "type": "ingestionStatsAndErrors"
+  },
+  "ingestionStatsAndErrors_1": {
+   "taskId": "compact_twitter_2018-09-25T18:24:23.920Z",
+   "payload": {
+    "ingestionState": "COMPLETED",
+    "unparseableEvents": {},
+    "rowStats": {
+     "buildSegments": {
+      "processed": 12345,
+      "processedBytes": 132456789,
+      "processedWithError": 0,
+      "thrownAway": 0,
+      "unparseable": 0
+     }
+    },
+    "segmentAvailabilityConfirmed": false,
+    "segmentAvailabilityWaitTimeMs": 0,
+    "recordsProcessed": null,
+    "errorMsg": null
+   },
+   "type": "ingestionStatsAndErrors"
+  }
+}
+```
+
+
+
 #### Segment Availability Fields
 
 For some task types, the indexing task can wait for the newly ingested 
segments to become available for queries after ingestion completes. The below 
fields inform the end user regarding the duration and result of the 
availability wait. For batch ingestion task types, refer to `tuningConfig` docs 
to see if the task supports an availability waiting period.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index bb3bc6826f8..776038b50a1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -51,6 +51,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -96,6 +97,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
@@ -113,6 +115,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Supplier;
@@ -499,6 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       log.info("Generated [%d] compaction task specs", totalNumSpecs);
 
       int failCnt = 0;
+      Map<String, TaskReport> completionReports = new HashMap<>();
       for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
         final String json = 
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
         if (!currentSubTaskHolder.setTask(eachSpec)) {
@@ -514,6 +518,9 @@ public class CompactionTask extends AbstractBatchIndexTask
               failCnt++;
               log.warn("Failed to run indexSpec: [%s].\nTrying the next 
indexSpec.", json);
             }
+            Optional.ofNullable(eachSpec.getCompletionReports())
+                    .ifPresent(reports -> completionReports.putAll(
+                        CollectionUtils.mapKeys(reports, key -> 
getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
           } else {
             failCnt++;
             log.warn("indexSpec is not ready: [%s].\nTrying the next 
indexSpec.", json);
@@ -528,6 +535,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] 
failed",
                                       totalNumSpecs, totalNumSpecs - failCnt, 
failCnt
       );
+
+      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
       log.info(msg);
       return failCnt == 0 ? TaskStatus.success(getId()) : 
TaskStatus.failure(getId(), msg);
     }
@@ -542,7 +551,8 @@ public class CompactionTask extends AbstractBatchIndexTask
         getTaskResource(),
         ingestionSpec,
         baseSequenceName,
-        createContextForSubtask()
+        createContextForSubtask(),
+        true
     );
   }
 
@@ -562,6 +572,11 @@ public class CompactionTask extends AbstractBatchIndexTask
     return StringUtils.format("%s_%d", getId(), i);
   }
 
+  private String getReportkey(String baseSequenceName, String currentKey)
+  {
+    return StringUtils.format("%s_%s", currentKey, 
baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1));
+  }
+
   /**
    * Generate {@link ParallelIndexIngestionSpec} from input segments.
    *
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a0d7f4143f7..ce3245944e2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -169,7 +169,9 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
   private IngestionState ingestionState;
 
-  private boolean shouldCleanup;
+  // used to specify if indextask.run() is run as a part of another task
+  // skips writing reports and cleanup if not a standalone task
+  private boolean isStandAloneTask;
 
   @MonotonicNonNull
   private ParseExceptionHandler determinePartitionsParseExceptionHandler;
@@ -189,6 +191,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @Nullable
   private String errorMsg;
 
+  private Map<String, TaskReport> completionReports;
+
   @JsonCreator
   public IndexTask(
       @JsonProperty("id") final String id,
@@ -222,7 +226,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       IndexIngestionSpec ingestionSchema,
       Map<String, Object> context,
       int maxAllowedLockCount,
-      boolean shouldCleanup
+      boolean isStandAloneTask
   )
   {
     super(
@@ -237,7 +241,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     this.baseSequenceName = baseSequenceName == null ? getId() : 
baseSequenceName;
     this.ingestionSchema = ingestionSchema;
     this.ingestionState = IngestionState.NOT_STARTED;
-    this.shouldCleanup = shouldCleanup;
+    this.isStandAloneTask = isStandAloneTask;
   }
 
   @Override
@@ -314,6 +318,13 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
            ImmutableSet.of();
   }
 
+  @Nullable
+  @JsonIgnore
+  public Map<String, TaskReport> getCompletionReports()
+  {
+    return completionReports;
+  }
+
   @GET
   @Path("/unparseableEvents")
   @Produces(MediaType.APPLICATION_JSON)
@@ -556,7 +567,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+      completionReports = getTaskCompletionReports();
+      writeCompletionReports(toolbox);
       return TaskStatus.failure(
           getId(),
           errorMsg
@@ -568,6 +580,13 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     }
   }
 
+  private void writeCompletionReports(TaskToolbox toolbox)
+  {
+    if (isStandAloneTask) {
+      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
+    }
+  }
+
   private Map<String, TaskReport> getTaskCompletionReports()
   {
     return TaskReport.buildTaskReports(
@@ -1024,7 +1043,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       if (published == null) {
         log.error("Failed to publish segments, aborting!");
         errorMsg = "Failed to publish segments.";
-        toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+        completionReports = getTaskCompletionReports();
+        writeCompletionReports(toolbox);
         return TaskStatus.failure(
             getId(),
             errorMsg
@@ -1047,7 +1067,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
         log.debugSegments(published.getSegments(), "Published segments");
 
-        toolbox.getTaskReportFileWriter().write(getId(), 
getTaskCompletionReports());
+        completionReports = getTaskCompletionReports();
+        writeCompletionReports(toolbox);
         return TaskStatus.success(getId());
       }
     }
@@ -1089,7 +1110,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   @Override
   public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) 
throws Exception
   {
-    if (shouldCleanup) {
+    if (isStandAloneTask) {
       super.cleanUp(toolbox, taskStatus);
     }
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 894fe1e5b06..d7980e9b371 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   private volatile Pair<Map<String, Object>, Map<String, Object>> 
indexGenerateRowStats;
 
   private IngestionState ingestionState;
+  private Map<String, TaskReport> completionReports;
+  private final Boolean isCompactionTask;
 
 
   @JsonCreator
@@ -213,7 +215,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       @JsonProperty("context") Map<String, Object> context
   )
   {
-    this(id, groupId, taskResource, ingestionSchema, null, context);
+    this(id, groupId, taskResource, ingestionSchema, null, context, false);
   }
 
   public ParallelIndexSupervisorTask(
@@ -222,7 +224,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       TaskResource taskResource,
       ParallelIndexIngestionSpec ingestionSchema,
       @Nullable String baseSubtaskSpecName,
-      Map<String, Object> context
+      Map<String, Object> context,
+      Boolean isCompactionTask
   )
   {
     super(
@@ -259,6 +262,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
     awaitSegmentAvailabilityTimeoutMillis = 
ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
     this.ingestionState = IngestionState.NOT_STARTED;
+    this.isCompactionTask = isCompactionTask;
   }
 
   private static void 
checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
@@ -292,6 +296,20 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
            ImmutableSet.of();
   }
 
+  @Nullable
+  @JsonIgnore
+  public Map<String, TaskReport> getCompletionReports()
+  {
+    return completionReports;
+  }
+
+  @Nullable
+  @JsonIgnore
+  public String getBaseSubtaskSpecName()
+  {
+    return baseSubtaskSpecName;
+  }
+
   @JsonProperty("spec")
   public ParallelIndexIngestionSpec getIngestionSchema()
   {
@@ -651,10 +669,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       }
       taskStatus = TaskStatus.failure(getId(), errorMessage);
     }
-    toolbox.getTaskReportFileWriter().write(
-        getId(),
-        getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted)
-    );
+    completionReports = getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted);
+    writeCompletionReports(toolbox);
     return taskStatus;
   }
 
@@ -821,10 +837,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       taskStatus = TaskStatus.failure(getId(), errMsg);
     }
 
-    toolbox.getTaskReportFileWriter().write(
-        getId(),
-        getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted)
-    );
+    completionReports = getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted);
+    writeCompletionReports(toolbox);
     return taskStatus;
   }
 
@@ -921,10 +935,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       taskStatus = TaskStatus.failure(getId(), errMsg);
     }
 
-    toolbox.getTaskReportFileWriter().write(
-        getId(),
-        getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted)
-    );
+    completionReports = getTaskCompletionReports(taskStatus, 
segmentAvailabilityConfirmationCompleted);
+    writeCompletionReports(toolbox);
     return taskStatus;
   }
 
@@ -1211,7 +1223,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
     if (currentSubTaskHolder.setTask(sequentialIndexTask)
         && sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
-      return sequentialIndexTask.run(toolbox);
+      TaskStatus status = sequentialIndexTask.run(toolbox);
+      completionReports = sequentialIndexTask.getCompletionReports();
+      return status;
     } else {
       String msg = "Task was asked to stop. Finish as failed";
       LOG.info(msg);
@@ -1247,6 +1261,13 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     );
   }
 
+  private void writeCompletionReports(TaskToolbox toolbox)
+  {
+    if (!isCompactionTask) {
+      toolbox.getTaskReportFileWriter().write(getId(), completionReports);
+    }
+  }
+
   private static IndexTuningConfig 
convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
   {
     return new IndexTuningConfig(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index b49a223e718..12cf09d3317 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -246,6 +247,9 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
       );
       Assert.assertEquals("Compaction state for " + segment.getId(), 
expectedState, segment.getLastCompactionState());
     }
+
+    List<IngestionStatsAndErrorsTaskReportData> reports = 
getIngestionReports();
+    Assert.assertEquals(reports.size(), 3); // since three index tasks are run 
by single compaction task
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 4a0707e3586..f438b7bba84 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -49,9 +50,13 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -232,6 +237,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
   private CoordinatorClient coordinatorClient;
   // An executor that executes API calls using a different thread from the 
caller thread as if they were remote calls.
   private ExecutorService remoteApiExecutor;
+  private File reportsFile;
 
   protected AbstractParallelIndexSupervisorTaskTest(
       double transientTaskFailureRate,
@@ -257,6 +263,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
     remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
     coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
     prepareObjectMapper(objectMapper, getIndexIO());
+    reportsFile = temporaryFolder.newFile();
   }
 
   @After
@@ -702,6 +709,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
         
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
 true)))
         .taskReportFileWriter(new NoopTestTaskReportFileWriter())
         .intermediaryDataManager(intermediaryDataManager)
+        .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
         .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
         .chatHandlerProvider(new NoopChatHandlerProvider())
         .rowIngestionMetersFactory(new 
TestUtils().getRowIngestionMetersFactory())
@@ -1064,4 +1072,20 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       throw new ISE("Can't find segment for id[%s]", segmentId);
     }
   }
+
+  public Map<String, TaskReport> getReports() throws IOException
+  {
+    return objectMapper.readValue(reportsFile, new TypeReference<Map<String, 
TaskReport>>()
+    {
+    });
+  }
+
+  public List<IngestionStatsAndErrorsTaskReportData> getIngestionReports() 
throws IOException
+  {
+    return getReports().entrySet()
+                       .stream()
+                       .filter(entry -> 
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+                       .map(entry -> (IngestionStatsAndErrorsTaskReportData) 
entry.getValue().getPayload())
+                       .collect(Collectors.toList());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
index 39eaa4af62a..b9d5114a1e6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
@@ -314,7 +314,7 @@ public class HashPartitionTaskKillTest extends 
AbstractMultiPhaseParallelIndexin
         int succedsBeforeFailing
     )
     {
-      super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, 
context);
+      super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, 
context, false);
       this.succeedsBeforeFailing = succedsBeforeFailing;
     }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
index ce112aafce8..43b0ac902c1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
@@ -243,7 +243,7 @@ public class RangePartitionTaskKillTest extends 
AbstractMultiPhaseParallelIndexi
         int succedsBeforeFailing
     )
     {
-      super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, 
context);
+      super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, 
context, false);
       this.failInPhase = succedsBeforeFailing;
     }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 957c8a5522c..95b3e112970 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
 
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -31,6 +32,7 @@ import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.tests.TestNGGroup;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -198,7 +200,7 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
       );
 
       queryHelper.testQueriesFromString(queryResponseTemplate);
-      compactData(compactionResource, newSegmentGranularity, null);
+      String taskId = compactData(compactionResource, newSegmentGranularity, 
null);
 
       // The original 4 segments should be compacted into 2 new segments
       checkNumberOfSegments(2);
@@ -215,10 +217,17 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
         expectedIntervalAfterCompaction = newIntervals;
       }
       checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      Map<String, IngestionStatsAndErrorsTaskReport> reports = 
indexer.getTaskReport(taskId);
+      Assert.assertTrue(reports != null && reports.size() > 0);
     }
   }
 
-  private void compactData(String compactionResource, GranularityType 
newSegmentGranularity, GranularityType newQueryGranularity) throws Exception
+  private String compactData(
+      String compactionResource,
+      GranularityType newSegmentGranularity,
+      GranularityType newQueryGranularity
+  ) throws Exception
   {
     String template = getResourceAsString(compactionResource);
     template = StringUtils.replace(template, "%%DATASOURCE%%", 
fullDatasourceName);
@@ -251,6 +260,8 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
         () -> coordinator.areSegmentsLoaded(fullDatasourceName),
         "Segment Compaction"
     );
+
+    return taskID;
   }
 
   private void checkQueryGranularity(String queryResource, String 
expectedQueryGranularity, int segmentCount) throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to