This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 564c44ed85c Add stats segmentsRead and segmentsPublished to compaction
task reports (#15947)
564c44ed85c is described below
commit 564c44ed85cdd85bc3b68cd4442eaae746d52e4e
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 6 22:07:23 2024 -0600
Add stats segmentsRead and segmentsPublished to compaction task reports
(#15947)
Changes:
- Add visibility into number of segments read/published by each parallel
compaction
- Add new fields `segmentsRead`, `segmentsPublished` to
`IngestionStatsAndErrorsTaskReportData`
- Update `ParallelIndexSupervisorTask` to populate the new stats
---
docs/ingestion/tasks.md | 8 ++++
.../common/IngestionStatsAndErrorsTaskReport.java | 2 +-
.../IngestionStatsAndErrorsTaskReportData.java | 38 ++++++++++++++--
.../task/AppenderatorDriverRealtimeIndexTask.java | 4 +-
.../indexing/common/task/HadoopIndexTask.java | 4 +-
.../druid/indexing/common/task/IndexTask.java | 4 +-
.../parallel/ParallelIndexSupervisorTask.java | 27 +++++++++++-
.../batch/parallel/PartialSegmentGenerateTask.java | 22 ++++++++--
.../task/batch/parallel/SinglePhaseSubTask.java | 4 +-
.../SeekableStreamIndexTaskRunner.java | 4 +-
.../common/task/CompactionTaskParallelRunTest.java | 12 ++++++
.../indexing/common/task/TaskReportSerdeTest.java | 6 ++-
.../druid/tests/indexer/ITCompactionTaskTest.java | 50 ++++++++++++++++++++++
.../wikipedia_compaction_task_parallel.json | 22 ++++++++++
14 files changed, 193 insertions(+), 14 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index fbf1f4a38e7..4b6153fa26a 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the
newly ingested segments
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for
the newly ingested segments to be available for query after completing
ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and
includes count of records processed from each partition.|
+
+#### Compaction task segment info fields
+
+|Field|Description|
+|---|---|
+|`segmentsRead`|Number of segments read by compaction task with more than 1
subtask.|
+|`segmentsPublished`|Number of segments published by compaction task with more
than 1 subtask.|
+
### Live report
When a task is running, a live report containing ingestion state, unparseable
events and moving average for number of events processed for 1 min, 5 min, 15
min time window can be retrieved at:
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 35ae2f66988..7518523b1de 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements
TaskReport
}
@Override
- public Object getPayload()
+ public IngestionStatsAndErrorsTaskReportData getPayload()
{
return payload;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
index ecdd9d3aeba..97ea58e1c5c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.IngestionState;
@@ -50,6 +51,11 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty
private Map<String, Long> recordsProcessed;
+ @JsonProperty
+ private Long segmentsRead;
+ @JsonProperty
+ private Long segmentsPublished;
+
public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
@JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@@ -57,7 +63,9 @@ public class IngestionStatsAndErrorsTaskReportData
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean
segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long
segmentAvailabilityWaitTimeMs,
- @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
+ @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
+ @Nullable @JsonProperty("segmentsRead") Long segmentsRead,
+ @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
)
{
this.ingestionState = ingestionState;
@@ -67,6 +75,8 @@ public class IngestionStatsAndErrorsTaskReportData
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
+ this.segmentsRead = segmentsRead;
+ this.segmentsPublished = segmentsPublished;
}
@JsonProperty
@@ -113,6 +123,22 @@ public class IngestionStatsAndErrorsTaskReportData
return recordsProcessed;
}
+ @JsonProperty
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Long getSegmentsRead()
+ {
+ return segmentsRead;
+ }
+
+ @JsonProperty
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Long getSegmentsPublished()
+ {
+ return segmentsPublished;
+ }
+
public static IngestionStatsAndErrorsTaskReportData
getPayloadFromTaskReports(
Map<String, TaskReport> taskReports
)
@@ -137,7 +163,9 @@ public class IngestionStatsAndErrorsTaskReportData
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(),
that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(),
that.getSegmentAvailabilityWaitTimeMs()) &&
- Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
+ Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) &&
+ Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
+ Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
}
@Override
@@ -150,7 +178,9 @@ public class IngestionStatsAndErrorsTaskReportData
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs(),
- getRecordsProcessed()
+ getRecordsProcessed(),
+ getSegmentsRead(),
+ getSegmentsPublished()
);
}
@@ -165,6 +195,8 @@ public class IngestionStatsAndErrorsTaskReportData
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
+ ", segmentsRead=" + segmentsRead +
+ ", segmentsPublished=" + segmentsPublished +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9ea94b844cc..e0c7f9bc934 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -621,7 +621,9 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
errorMsg,
errorMsg == null,
0L,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ null,
+ null
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 1422720bf94..019b63520e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -694,7 +694,9 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ null,
+ null
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 0c9bc57aa50..50e13a93c0b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -602,7 +602,9 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
errorMsg,
segmentAvailabilityConfirmationCompleted,
segmentAvailabilityWaitTimeMs,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ null,
+ null
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 162aa30c698..40366b7a7d1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
+ private Long segmentsRead;
+ private Long segmentsPublished;
private final boolean isCompactionTask;
@@ -643,6 +645,14 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
if (state.isSuccess()) {
//noinspection ConstantConditions
publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
+ if (isCompactionTask) {
+ // Populate segmentsRead only for compaction tasks
+ segmentsRead = parallelSinglePhaseRunner.getReports()
+ .values()
+ .stream()
+ .mapToLong(report ->
report.getOldSegments().size()).sum();
+ }
+
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
}
@@ -1189,6 +1199,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
} else {
throw new ISE("Failed to publish segments");
}
+
+ segmentsPublished = (long) newSegments.size();
}
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
@@ -1245,7 +1257,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
taskStatus.getErrorMsg(),
segmentAvailabilityConfirmed,
segmentAvailabilityWaitTimeMs,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ segmentsRead,
+ segmentsPublished
)
)
);
@@ -1629,6 +1643,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
final SimpleRowIngestionMeters buildSegmentsRowStats = new
SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+ long totalSegmentsRead = 0L;
for (GeneratedPartitionsReport generatedPartitionsReport :
completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport =
generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
@@ -1639,6 +1654,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
getBuildSegmentsStatsFromTaskReport(taskReport, true,
unparseableEvents);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
+
+ Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
+ taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+ ).getPayload().getSegmentsRead();
+ if (segmentsReadFromPartition != null) {
+ totalSegmentsRead += segmentsReadFromPartition;
+ }
}
RowIngestionMetersTotals rowStatsForRunningTasks =
getRowStatsAndUnparseableEventsForRunningTasks(
@@ -1647,6 +1669,9 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
+ if (totalSegmentsRead > 0) {
+ segmentsRead = totalSegmentsRead;
+ }
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(),
unparseableEvents);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 9b1a90b5791..1cb3d36ad75 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -37,6 +37,8 @@ import
org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
+import org.apache.druid.indexing.firehose.WindowedSegmentId;
+import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -125,7 +127,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
toolbox.getIndexingTmpDir()
);
- Map<String, TaskReport> taskReport = getTaskCompletionReports();
+ Map<String, TaskReport> taskReport =
getTaskCompletionReports(getNumSegmentsRead(inputSource));
taskClient.report(createGeneratedPartitionsReport(toolbox, segments,
taskReport));
@@ -149,6 +151,18 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
Map<String, TaskReport> taskReport
);
+ private Long getNumSegmentsRead(InputSource inputSource)
+ {
+ if (inputSource instanceof DruidInputSource) {
+ List<WindowedSegmentId> segments = ((DruidInputSource)
inputSource).getSegmentIds();
+ if (segments != null) {
+ return (long) segments.size();
+ }
+ }
+
+ return null;
+ }
+
private List<DataSegment> generateSegments(
final TaskToolbox toolbox,
final ParallelIndexSupervisorTaskClient taskClient,
@@ -236,7 +250,7 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
- private Map<String, TaskReport> getTaskCompletionReports()
+ private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
@@ -248,7 +262,9 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ segmentsRead,
+ null
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 8d9bd8f7b6d..bbd3f2964b6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -644,7 +644,9 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
errorMsg,
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ null,
+ null
)
)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c7f77cea566..09e89b1d601 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1130,7 +1130,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
errorMsg,
errorMsg == null,
handoffWaitMs,
- getPartitionStats()
+ getPartitionStats(),
+ null,
+ null
)
)
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 12cf09d3317..9b68cebe9ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -250,6 +250,18 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
List<IngestionStatsAndErrorsTaskReportData> reports =
getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run
by single compaction task
+
+ // this test reads 3 segments and publishes 6 segments
+ Assert.assertEquals(
+ 3,
+
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
+ );
+ Assert.assertEquals(
+ 6,
+ reports.stream()
+
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
+ .sum()
+ );
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index c0166d141b0..53b8ace317a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -70,7 +70,9 @@ public class TaskReportSerdeTest
"an error message",
true,
1000L,
- ImmutableMap.of("PartitionA", 5000L)
+ ImmutableMap.of("PartitionA", 5000L),
+ 5L,
+ 10L
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
@@ -127,6 +129,8 @@ public class TaskReportSerdeTest
"an error message",
true,
1000L,
+ null,
+ null,
null
)
);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 95b3e112970..296f7fe747a 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.tests.indexer;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -65,6 +66,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final String SEGMENT_METADATA_QUERY_RESOURCE =
"/indexer/segment_metadata_query.json";
private static final String COMPACTION_TASK =
"/indexer/wikipedia_compaction_task.json";
+ private static final String PARALLEL_COMPACTION_TASK =
"/indexer/wikipedia_compaction_task_parallel.json";
private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY =
"/indexer/wikipedia_compaction_task_with_segment_granularity.json";
private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC =
"/indexer/wikipedia_compaction_task_with_granularity_spec.json";
@@ -138,6 +140,54 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
}
}
+ @Test
+ public void testParallelHashedCompaction() throws Exception
+ {
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ loadData(INDEX_TASK, fullDatasourceName);
+ // 4 segments across 2 days
+ checkNumberOfSegments(4);
+ List<String> expectedIntervalAfterCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ expectedIntervalAfterCompaction.sort(null);
+
+ checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE,
GranularityType.SECOND.name(), 4);
+ String queryResponseTemplate =
getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
+
+ queryResponseTemplate = StringUtils.replace(
+ queryResponseTemplate,
+ "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+ jsonMapper.writeValueAsString("0")
+ );
+
+ queryHelper.testQueriesFromString(queryResponseTemplate);
+ String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null);
+
+ // The original 4 segments should be compacted into 2 new segments
+ checkNumberOfSegments(2);
+ queryHelper.testQueriesFromString(queryResponseTemplate);
+ checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE,
GranularityType.SECOND.name(), 2);
+
+
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ Map<String, IngestionStatsAndErrorsTaskReport> reports =
indexer.getTaskReport(taskId);
+ Assert.assertTrue(reports != null && reports.size() > 0);
+
+ Assert.assertEquals(2,
+ reports.values()
+ .stream()
+ .mapToLong(r ->
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished())
+ .sum()
+ );
+ Assert.assertEquals(4,
+ reports.values()
+ .stream()
+ .mapToLong(r ->
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead())
+ .sum()
+ );
+ }
+ }
+
@Test
public void
testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec()
throws Exception
{
diff --git
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
new file mode 100644
index 00000000000..8616946ebba
--- /dev/null
+++
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
@@ -0,0 +1,22 @@
+{
+ "type" : "compact",
+ "dataSource" : "%%DATASOURCE%%",
+ "ioConfig" : {
+ "type": "compact",
+ "inputSpec": {
+ "type": "interval",
+ "interval": "2013-08-31/2013-09-02"
+ }
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "partitionsSpec": {
+ "type": "hashed"
+ },
+ "forceGuaranteedRollup": true,
+ "maxNumConcurrentSubTasks": 3
+ },
+ "context" : {
+ "storeCompactionState" : true
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]