This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ec52f686c0c Fix compaction tasks reports getting overwritten (#15981)
ec52f686c0c is described below
commit ec52f686c0c36da4b70ebe9495522dac9ea47b8d
Author: Adithya Chakilam <[email protected]>
AuthorDate: Mon Mar 4 09:10:17 2024 -0600
Fix compaction tasks reports getting overwritten (#15981)
* Fix compaction tasks reports geting overwrittened
* only skip for compactiont task
* address comments
* fix boolean
* move boolean flag to task rather than spec
* rename variable
* add docs, fix missing case
* Update docs/ingestion/tasks.md
* rename var
* add task report decode check in IT
* change assert
---
docs/ingestion/tasks.md | 52 ++++++++++++++++++++++
.../druid/indexing/common/task/CompactionTask.java | 17 ++++++-
.../druid/indexing/common/task/IndexTask.java | 35 ++++++++++++---
.../parallel/ParallelIndexSupervisorTask.java | 51 ++++++++++++++-------
.../common/task/CompactionTaskParallelRunTest.java | 4 ++
.../AbstractParallelIndexSupervisorTaskTest.java | 24 ++++++++++
.../batch/parallel/HashPartitionTaskKillTest.java | 2 +-
.../batch/parallel/RangePartitionTaskKillTest.java | 2 +-
.../druid/tests/indexer/ITCompactionTaskTest.java | 15 ++++++-
9 files changed, 175 insertions(+), 27 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index af9d8b7f88b..fbf1f4a38e7 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -93,6 +93,58 @@ An example output is shown below:
}
```
+Compaction tasks can generate multiple sets of segment output reports based on
how the input interval is split. So the overall report contains mappings from
each split to each report.
+Example report could be:
+
+```json
+{
+ "ingestionStatsAndErrors_0": {
+ "taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
+ "payload": {
+ "ingestionState": "COMPLETED",
+ "unparseableEvents": {},
+ "rowStats": {
+ "buildSegments": {
+ "processed": 5390324,
+ "processedBytes": 5109573212,
+ "processedWithError": 0,
+ "thrownAway": 0,
+ "unparseable": 0
+ }
+ },
+ "segmentAvailabilityConfirmed": false,
+ "segmentAvailabilityWaitTimeMs": 0,
+ "recordsProcessed": null,
+ "errorMsg": null
+ },
+ "type": "ingestionStatsAndErrors"
+ },
+ "ingestionStatsAndErrors_1": {
+ "taskId": "compact_twitter_2018-09-25T18:24:23.920Z",
+ "payload": {
+ "ingestionState": "COMPLETED",
+ "unparseableEvents": {},
+ "rowStats": {
+ "buildSegments": {
+ "processed": 12345,
+ "processedBytes": 132456789,
+ "processedWithError": 0,
+ "thrownAway": 0,
+ "unparseable": 0
+ }
+ },
+ "segmentAvailabilityConfirmed": false,
+ "segmentAvailabilityWaitTimeMs": 0,
+ "recordsProcessed": null,
+ "errorMsg": null
+ },
+ "type": "ingestionStatsAndErrors"
+ }
+}
+```
+
+
+
#### Segment Availability Fields
For some task types, the indexing task can wait for the newly ingested
segments to become available for queries after ingestion completes. The below
fields inform the end user regarding the duration and result of the
availability wait. For batch ingestion task types, refer to `tuningConfig` docs
to see if the task supports an availability waiting period.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index bb3bc6826f8..776038b50a1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -51,6 +51,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -96,6 +97,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -113,6 +115,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
@@ -499,6 +502,7 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
+ Map<String, TaskReport> completionReports = new HashMap<>();
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
final String json =
toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
@@ -514,6 +518,9 @@ public class CompactionTask extends AbstractBatchIndexTask
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next
indexSpec.", json);
}
+ Optional.ofNullable(eachSpec.getCompletionReports())
+ .ifPresent(reports -> completionReports.putAll(
+ CollectionUtils.mapKeys(reports, key ->
getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next
indexSpec.", json);
@@ -528,6 +535,8 @@ public class CompactionTask extends AbstractBatchIndexTask
String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d]
failed",
totalNumSpecs, totalNumSpecs - failCnt,
failCnt
);
+
+ toolbox.getTaskReportFileWriter().write(getId(), completionReports);
log.info(msg);
return failCnt == 0 ? TaskStatus.success(getId()) :
TaskStatus.failure(getId(), msg);
}
@@ -542,7 +551,8 @@ public class CompactionTask extends AbstractBatchIndexTask
getTaskResource(),
ingestionSpec,
baseSequenceName,
- createContextForSubtask()
+ createContextForSubtask(),
+ true
);
}
@@ -562,6 +572,11 @@ public class CompactionTask extends AbstractBatchIndexTask
return StringUtils.format("%s_%d", getId(), i);
}
+ private String getReportkey(String baseSequenceName, String currentKey)
+ {
+ return StringUtils.format("%s_%s", currentKey,
baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1));
+ }
+
/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a0d7f4143f7..ce3245944e2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -169,7 +169,9 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
private IngestionState ingestionState;
- private boolean shouldCleanup;
+ // used to specify if indextask.run() is run as a part of another task
+ // skips writing reports and cleanup if not a standalone task
+ private boolean isStandAloneTask;
@MonotonicNonNull
private ParseExceptionHandler determinePartitionsParseExceptionHandler;
@@ -189,6 +191,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Nullable
private String errorMsg;
+ private Map<String, TaskReport> completionReports;
+
@JsonCreator
public IndexTask(
@JsonProperty("id") final String id,
@@ -222,7 +226,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
IndexIngestionSpec ingestionSchema,
Map<String, Object> context,
int maxAllowedLockCount,
- boolean shouldCleanup
+ boolean isStandAloneTask
)
{
super(
@@ -237,7 +241,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
this.baseSequenceName = baseSequenceName == null ? getId() :
baseSequenceName;
this.ingestionSchema = ingestionSchema;
this.ingestionState = IngestionState.NOT_STARTED;
- this.shouldCleanup = shouldCleanup;
+ this.isStandAloneTask = isStandAloneTask;
}
@Override
@@ -314,6 +318,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
ImmutableSet.of();
}
+ @Nullable
+ @JsonIgnore
+ public Map<String, TaskReport> getCompletionReports()
+ {
+ return completionReports;
+ }
+
@GET
@Path("/unparseableEvents")
@Produces(MediaType.APPLICATION_JSON)
@@ -556,7 +567,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
errorMsg = Throwables.getStackTraceAsString(e);
- toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
+ completionReports = getTaskCompletionReports();
+ writeCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
@@ -568,6 +580,13 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
}
+ private void writeCompletionReports(TaskToolbox toolbox)
+ {
+ if (isStandAloneTask) {
+ toolbox.getTaskReportFileWriter().write(getId(), completionReports);
+ }
+ }
+
private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
@@ -1024,7 +1043,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
if (published == null) {
log.error("Failed to publish segments, aborting!");
errorMsg = "Failed to publish segments.";
- toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
+ completionReports = getTaskCompletionReports();
+ writeCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
@@ -1047,7 +1067,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
log.debugSegments(published.getSegments(), "Published segments");
- toolbox.getTaskReportFileWriter().write(getId(),
getTaskCompletionReports());
+ completionReports = getTaskCompletionReports();
+ writeCompletionReports(toolbox);
return TaskStatus.success(getId());
}
}
@@ -1089,7 +1110,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus)
throws Exception
{
- if (shouldCleanup) {
+ if (isStandAloneTask) {
super.cleanUp(toolbox, taskStatus);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 894fe1e5b06..d7980e9b371 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private volatile Pair<Map<String, Object>, Map<String, Object>>
indexGenerateRowStats;
private IngestionState ingestionState;
+ private Map<String, TaskReport> completionReports;
+ private final Boolean isCompactionTask;
@JsonCreator
@@ -213,7 +215,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@JsonProperty("context") Map<String, Object> context
)
{
- this(id, groupId, taskResource, ingestionSchema, null, context);
+ this(id, groupId, taskResource, ingestionSchema, null, context, false);
}
public ParallelIndexSupervisorTask(
@@ -222,7 +224,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
@Nullable String baseSubtaskSpecName,
- Map<String, Object> context
+ Map<String, Object> context,
+ Boolean isCompactionTask
)
{
super(
@@ -259,6 +262,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
awaitSegmentAvailabilityTimeoutMillis =
ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
this.ingestionState = IngestionState.NOT_STARTED;
+ this.isCompactionTask = isCompactionTask;
}
private static void
checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
@@ -292,6 +296,20 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
ImmutableSet.of();
}
+ @Nullable
+ @JsonIgnore
+ public Map<String, TaskReport> getCompletionReports()
+ {
+ return completionReports;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public String getBaseSubtaskSpecName()
+ {
+ return baseSubtaskSpecName;
+ }
+
@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
@@ -651,10 +669,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
- toolbox.getTaskReportFileWriter().write(
- getId(),
- getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted)
- );
+ completionReports = getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted);
+ writeCompletionReports(toolbox);
return taskStatus;
}
@@ -821,10 +837,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
taskStatus = TaskStatus.failure(getId(), errMsg);
}
- toolbox.getTaskReportFileWriter().write(
- getId(),
- getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted)
- );
+ completionReports = getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted);
+ writeCompletionReports(toolbox);
return taskStatus;
}
@@ -921,10 +935,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
taskStatus = TaskStatus.failure(getId(), errMsg);
}
- toolbox.getTaskReportFileWriter().write(
- getId(),
- getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted)
- );
+ completionReports = getTaskCompletionReports(taskStatus,
segmentAvailabilityConfirmationCompleted);
+ writeCompletionReports(toolbox);
return taskStatus;
}
@@ -1211,7 +1223,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
if (currentSubTaskHolder.setTask(sequentialIndexTask)
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
- return sequentialIndexTask.run(toolbox);
+ TaskStatus status = sequentialIndexTask.run(toolbox);
+ completionReports = sequentialIndexTask.getCompletionReports();
+ return status;
} else {
String msg = "Task was asked to stop. Finish as failed";
LOG.info(msg);
@@ -1247,6 +1261,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
);
}
+ private void writeCompletionReports(TaskToolbox toolbox)
+ {
+ if (!isCompactionTask) {
+ toolbox.getTaskReportFileWriter().write(getId(), completionReports);
+ }
+ }
+
private static IndexTuningConfig
convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
{
return new IndexTuningConfig(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index b49a223e718..12cf09d3317 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -39,6 +39,7 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -246,6 +247,9 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
);
Assert.assertEquals("Compaction state for " + segment.getId(),
expectedState, segment.getLastCompactionState());
}
+
+ List<IngestionStatsAndErrorsTaskReportData> reports =
getIngestionReports();
+ Assert.assertEquals(reports.size(), 3); // since three index tasks are run
by single compaction task
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 4a0707e3586..f438b7bba84 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -49,9 +50,13 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -232,6 +237,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
private CoordinatorClient coordinatorClient;
// An executor that executes API calls using a different thread from the
caller thread as if they were remote calls.
private ExecutorService remoteApiExecutor;
+ private File reportsFile;
protected AbstractParallelIndexSupervisorTaskTest(
double transientTaskFailureRate,
@@ -257,6 +263,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
prepareObjectMapper(objectMapper, getIndexIO());
+ reportsFile = temporaryFolder.newFile();
}
@After
@@ -702,6 +709,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
true)))
.taskReportFileWriter(new NoopTestTaskReportFileWriter())
.intermediaryDataManager(intermediaryDataManager)
+ .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(new
TestUtils().getRowIngestionMetersFactory())
@@ -1064,4 +1072,20 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
throw new ISE("Can't find segment for id[%s]", segmentId);
}
}
+
+ public Map<String, TaskReport> getReports() throws IOException
+ {
+ return objectMapper.readValue(reportsFile, new TypeReference<Map<String,
TaskReport>>()
+ {
+ });
+ }
+
+ public List<IngestionStatsAndErrorsTaskReportData> getIngestionReports()
throws IOException
+ {
+ return getReports().entrySet()
+ .stream()
+ .filter(entry ->
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+ .map(entry -> (IngestionStatsAndErrorsTaskReportData)
entry.getValue().getPayload())
+ .collect(Collectors.toList());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
index 39eaa4af62a..b9d5114a1e6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionTaskKillTest.java
@@ -314,7 +314,7 @@ public class HashPartitionTaskKillTest extends
AbstractMultiPhaseParallelIndexin
int succedsBeforeFailing
)
{
- super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName,
context);
+ super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName,
context, false);
this.succeedsBeforeFailing = succedsBeforeFailing;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
index ce112aafce8..43b0ac902c1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionTaskKillTest.java
@@ -243,7 +243,7 @@ public class RangePartitionTaskKillTest extends
AbstractMultiPhaseParallelIndexi
int succedsBeforeFailing
)
{
- super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName,
context);
+ super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName,
context, false);
this.failInPhase = succedsBeforeFailing;
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 957c8a5522c..95b3e112970 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -31,6 +32,7 @@ import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@@ -198,7 +200,7 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
);
queryHelper.testQueriesFromString(queryResponseTemplate);
- compactData(compactionResource, newSegmentGranularity, null);
+ String taskId = compactData(compactionResource, newSegmentGranularity,
null);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
@@ -215,10 +217,17 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
expectedIntervalAfterCompaction = newIntervals;
}
checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ Map<String, IngestionStatsAndErrorsTaskReport> reports =
indexer.getTaskReport(taskId);
+ Assert.assertTrue(reports != null && reports.size() > 0);
}
}
- private void compactData(String compactionResource, GranularityType
newSegmentGranularity, GranularityType newQueryGranularity) throws Exception
+ private String compactData(
+ String compactionResource,
+ GranularityType newSegmentGranularity,
+ GranularityType newQueryGranularity
+ ) throws Exception
{
String template = getResourceAsString(compactionResource);
template = StringUtils.replace(template, "%%DATASOURCE%%",
fullDatasourceName);
@@ -251,6 +260,8 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
+
+ return taskID;
}
private void checkQueryGranularity(String queryResource, String
expectedQueryGranularity, int segmentCount) throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]