This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 564c44ed85c Add stats segmentsRead and segmentsPublished to compaction 
task reports (#15947)
564c44ed85c is described below

commit 564c44ed85cdd85bc3b68cd4442eaae746d52e4e
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 6 22:07:23 2024 -0600

    Add stats segmentsRead and segmentsPublished to compaction task reports 
(#15947)
    
    Changes:
    - Add visibility into number of segments read/published by each parallel 
compaction
    - Add new fields `segmentsRead`, `segmentsPublished` to 
`IngestionStatsAndErrorsTaskReportData`
    - Update `ParallelIndexSupervisorTask` to populate the new stats
---
 docs/ingestion/tasks.md                            |  8 ++++
 .../common/IngestionStatsAndErrorsTaskReport.java  |  2 +-
 .../IngestionStatsAndErrorsTaskReportData.java     | 38 ++++++++++++++--
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  4 +-
 .../indexing/common/task/HadoopIndexTask.java      |  4 +-
 .../druid/indexing/common/task/IndexTask.java      |  4 +-
 .../parallel/ParallelIndexSupervisorTask.java      | 27 +++++++++++-
 .../batch/parallel/PartialSegmentGenerateTask.java | 22 ++++++++--
 .../task/batch/parallel/SinglePhaseSubTask.java    |  4 +-
 .../SeekableStreamIndexTaskRunner.java             |  4 +-
 .../common/task/CompactionTaskParallelRunTest.java | 12 ++++++
 .../indexing/common/task/TaskReportSerdeTest.java  |  6 ++-
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 50 ++++++++++++++++++++++
 .../wikipedia_compaction_task_parallel.json        | 22 ++++++++++
 14 files changed, 193 insertions(+), 14 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index fbf1f4a38e7..4b6153fa26a 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -155,6 +155,14 @@ For some task types, the indexing task can wait for the 
newly ingested segments
 |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for 
the newly ingested segments to be available for query after completing 
ingestion was completed.|
 |`recordsProcessed`| Partitions that were processed by an ingestion task and 
includes count of records processed from each partition.|
 
+
+#### Compaction task segment info fields
+
+|Field|Description|
+|---|---|
+|`segmentsRead`|Number of segments read by compaction task with more than 1 
subtask.|
+|`segmentsPublished`|Number of segments published by compaction task with more 
than 1 subtask.|
+
 ### Live report
 
 When a task is running, a live report containing ingestion state, unparseable 
events and moving average for number of events processed for 1 min, 5 min, 15 
min time window can be retrieved at:
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 35ae2f66988..7518523b1de 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -57,7 +57,7 @@ public class IngestionStatsAndErrorsTaskReport implements 
TaskReport
   }
 
   @Override
-  public Object getPayload()
+  public IngestionStatsAndErrorsTaskReportData getPayload()
   {
     return payload;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
index ecdd9d3aeba..97ea58e1c5c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexer.IngestionState;
 
@@ -50,6 +51,11 @@ public class IngestionStatsAndErrorsTaskReportData
   @JsonProperty
   private Map<String, Long> recordsProcessed;
 
+  @JsonProperty
+  private Long segmentsRead;
+  @JsonProperty
+  private Long segmentsPublished;
+
   public IngestionStatsAndErrorsTaskReportData(
       @JsonProperty("ingestionState") IngestionState ingestionState,
       @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
@@ -57,7 +63,9 @@ public class IngestionStatsAndErrorsTaskReportData
       @JsonProperty("errorMsg") @Nullable String errorMsg,
       @JsonProperty("segmentAvailabilityConfirmed") boolean 
segmentAvailabilityConfirmed,
       @JsonProperty("segmentAvailabilityWaitTimeMs") long 
segmentAvailabilityWaitTimeMs,
-      @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
+      @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
+      @Nullable @JsonProperty("segmentsRead") Long segmentsRead,
+      @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
   )
   {
     this.ingestionState = ingestionState;
@@ -67,6 +75,8 @@ public class IngestionStatsAndErrorsTaskReportData
     this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
     this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
     this.recordsProcessed = recordsProcessed;
+    this.segmentsRead = segmentsRead;
+    this.segmentsPublished = segmentsPublished;
   }
 
   @JsonProperty
@@ -113,6 +123,22 @@ public class IngestionStatsAndErrorsTaskReportData
     return recordsProcessed;
   }
 
+  @JsonProperty
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Long getSegmentsRead()
+  {
+    return segmentsRead;
+  }
+
+  @JsonProperty
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Long getSegmentsPublished()
+  {
+    return segmentsPublished;
+  }
+
   public static IngestionStatsAndErrorsTaskReportData 
getPayloadFromTaskReports(
       Map<String, TaskReport> taskReports
   )
@@ -137,7 +163,9 @@ public class IngestionStatsAndErrorsTaskReportData
            Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
            Objects.equals(isSegmentAvailabilityConfirmed(), 
that.isSegmentAvailabilityConfirmed()) &&
            Objects.equals(getSegmentAvailabilityWaitTimeMs(), 
that.getSegmentAvailabilityWaitTimeMs()) &&
-           Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
+           Objects.equals(getRecordsProcessed(), that.getRecordsProcessed()) &&
+           Objects.equals(getSegmentsRead(), that.getSegmentsRead()) &&
+           Objects.equals(getSegmentsPublished(), that.getSegmentsPublished());
   }
 
   @Override
@@ -150,7 +178,9 @@ public class IngestionStatsAndErrorsTaskReportData
         getErrorMsg(),
         isSegmentAvailabilityConfirmed(),
         getSegmentAvailabilityWaitTimeMs(),
-        getRecordsProcessed()
+        getRecordsProcessed(),
+        getSegmentsRead(),
+        getSegmentsPublished()
     );
   }
 
@@ -165,6 +195,8 @@ public class IngestionStatsAndErrorsTaskReportData
            ", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
            ", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
            ", recordsProcessed=" + recordsProcessed +
+           ", segmentsRead=" + segmentsRead +
+           ", segmentsPublished=" + segmentsPublished +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9ea94b844cc..e0c7f9bc934 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -621,7 +621,9 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
                 errorMsg,
                 errorMsg == null,
                 0L,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                null,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 1422720bf94..019b63520e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -694,7 +694,9 @@ public class HadoopIndexTask extends HadoopTask implements 
ChatHandler
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
                 segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                null,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 0c9bc57aa50..50e13a93c0b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -602,7 +602,9 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
                 segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                null,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 162aa30c698..40366b7a7d1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -203,6 +203,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private IngestionState ingestionState;
   private Map<String, TaskReport> completionReports;
+  private Long segmentsRead;
+  private Long segmentsPublished;
   private final boolean isCompactionTask;
 
 
@@ -643,6 +645,14 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     if (state.isSuccess()) {
       //noinspection ConstantConditions
       publishSegments(toolbox, parallelSinglePhaseRunner.getReports());
+      if (isCompactionTask) {
+        // Populate segmentsRead only for compaction tasks
+        segmentsRead = parallelSinglePhaseRunner.getReports()
+                                                .values()
+                                                .stream()
+                                                .mapToLong(report -> 
report.getOldSegments().size()).sum();
+      }
+
       if (awaitSegmentAvailabilityTimeoutMillis > 0) {
         waitForSegmentAvailability(parallelSinglePhaseRunner.getReports());
       }
@@ -1189,6 +1199,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     } else {
       throw new ISE("Failed to publish segments");
     }
+
+    segmentsPublished = (long) newSegments.size();
   }
 
   private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
@@ -1245,7 +1257,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
                 taskStatus.getErrorMsg(),
                 segmentAvailabilityConfirmed,
                 segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                segmentsRead,
+                segmentsPublished
             )
         )
     );
@@ -1629,6 +1643,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
       final SimpleRowIngestionMeters buildSegmentsRowStats = new 
SimpleRowIngestionMeters();
       final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+      long totalSegmentsRead = 0L;
       for (GeneratedPartitionsReport generatedPartitionsReport : 
completedSubtaskReports.values()) {
         Map<String, TaskReport> taskReport = 
generatedPartitionsReport.getTaskReport();
         if (taskReport == null || taskReport.isEmpty()) {
@@ -1639,6 +1654,13 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
             getBuildSegmentsStatsFromTaskReport(taskReport, true, 
unparseableEvents);
 
         
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
+
+        Long segmentsReadFromPartition = ((IngestionStatsAndErrorsTaskReport)
+            taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
+        ).getPayload().getSegmentsRead();
+        if (segmentsReadFromPartition != null) {
+          totalSegmentsRead += segmentsReadFromPartition;
+        }
       }
 
       RowIngestionMetersTotals rowStatsForRunningTasks = 
getRowStatsAndUnparseableEventsForRunningTasks(
@@ -1647,6 +1669,9 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
           includeUnparseable
       );
       
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
+      if (totalSegmentsRead > 0) {
+        segmentsRead = totalSegmentsRead;
+      }
 
       return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), 
unparseableEvents);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 9b1a90b5791..1cb3d36ad75 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -37,6 +37,8 @@ import 
org.apache.druid.indexing.common.task.SequenceNameFunction;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
+import org.apache.druid.indexing.firehose.WindowedSegmentId;
+import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -125,7 +127,7 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
         toolbox.getIndexingTmpDir()
     );
 
-    Map<String, TaskReport> taskReport = getTaskCompletionReports();
+    Map<String, TaskReport> taskReport = 
getTaskCompletionReports(getNumSegmentsRead(inputSource));
 
     taskClient.report(createGeneratedPartitionsReport(toolbox, segments, 
taskReport));
 
@@ -149,6 +151,18 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
       Map<String, TaskReport> taskReport
   );
 
+  private Long getNumSegmentsRead(InputSource inputSource)
+  {
+    if (inputSource instanceof DruidInputSource) {
+      List<WindowedSegmentId> segments = ((DruidInputSource) 
inputSource).getSegmentIds();
+      if (segments != null) {
+        return (long) segments.size();
+      }
+    }
+
+    return null;
+  }
+
   private List<DataSegment> generateSegments(
       final TaskToolbox toolbox,
       final ParallelIndexSupervisorTaskClient taskClient,
@@ -236,7 +250,7 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private Map<String, TaskReport> getTaskCompletionReports(Long segmentsRead)
   {
     return TaskReport.buildTaskReports(
         new IngestionStatsAndErrorsTaskReport(
@@ -248,7 +262,9 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
                 "",
                 false, // not applicable for parallel subtask
                 segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                segmentsRead,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 8d9bd8f7b6d..bbd3f2964b6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -644,7 +644,9 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
                 errorMsg,
                 false, // not applicable for parallel subtask
                 segmentAvailabilityWaitTimeMs,
-                Collections.emptyMap()
+                Collections.emptyMap(),
+                null,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c7f77cea566..09e89b1d601 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1130,7 +1130,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 errorMsg,
                 errorMsg == null,
                 handoffWaitMs,
-                getPartitionStats()
+                getPartitionStats(),
+                null,
+                null
             )
         )
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 12cf09d3317..9b68cebe9ee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -250,6 +250,18 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     List<IngestionStatsAndErrorsTaskReportData> reports = 
getIngestionReports();
     Assert.assertEquals(reports.size(), 3); // since three index tasks are run 
by single compaction task
+
+    // this test reads 3 segments and publishes 6 segments
+    Assert.assertEquals(
+        3,
+        
reports.stream().mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsRead).sum()
+    );
+    Assert.assertEquals(
+        6,
+        reports.stream()
+               
.mapToLong(IngestionStatsAndErrorsTaskReportData::getSegmentsPublished)
+               .sum()
+    );
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index c0166d141b0..53b8ace317a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -70,7 +70,9 @@ public class TaskReportSerdeTest
             "an error message",
             true,
             1000L,
-            ImmutableMap.of("PartitionA", 5000L)
+            ImmutableMap.of("PartitionA", 5000L),
+            5L,
+            10L
         )
     );
     String report1serialized = jsonMapper.writeValueAsString(report1);
@@ -127,6 +129,8 @@ public class TaskReportSerdeTest
             "an error message",
             true,
             1000L,
+            null,
+            null,
             null
         )
     );
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 95b3e112970..296f7fe747a 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.tests.indexer;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -65,6 +66,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   private static final String SEGMENT_METADATA_QUERY_RESOURCE = 
"/indexer/segment_metadata_query.json";
 
   private static final String COMPACTION_TASK = 
"/indexer/wikipedia_compaction_task.json";
+  private static final String PARALLEL_COMPACTION_TASK = 
"/indexer/wikipedia_compaction_task_parallel.json";
   private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY = 
"/indexer/wikipedia_compaction_task_with_segment_granularity.json";
   private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC = 
"/indexer/wikipedia_compaction_task_with_granularity_spec.json";
 
@@ -138,6 +140,54 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testParallelHashedCompaction() throws Exception
+  {
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      loadData(INDEX_TASK, fullDatasourceName);
+      // 4 segments across 2 days
+      checkNumberOfSegments(4);
+      List<String> expectedIntervalAfterCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      expectedIntervalAfterCompaction.sort(null);
+
+      checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, 
GranularityType.SECOND.name(), 4);
+      String queryResponseTemplate = 
getQueryResponseTemplate(INDEX_QUERIES_RESOURCE);
+
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+          jsonMapper.writeValueAsString("0")
+      );
+
+      queryHelper.testQueriesFromString(queryResponseTemplate);
+      String taskId = compactData(PARALLEL_COMPACTION_TASK, null, null);
+
+      // The original 4 segments should be compacted into 2 new segments
+      checkNumberOfSegments(2);
+      queryHelper.testQueriesFromString(queryResponseTemplate);
+      checkQueryGranularity(SEGMENT_METADATA_QUERY_RESOURCE, 
GranularityType.SECOND.name(), 2);
+
+
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      Map<String, IngestionStatsAndErrorsTaskReport> reports = 
indexer.getTaskReport(taskId);
+      Assert.assertTrue(reports != null && reports.size() > 0);
+
+      Assert.assertEquals(2,
+                          reports.values()
+                                 .stream()
+                                 .mapToLong(r -> 
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsPublished())
+                                 .sum()
+      );
+      Assert.assertEquals(4,
+                          reports.values()
+                                 .stream()
+                                 .mapToLong(r -> 
((IngestionStatsAndErrorsTaskReportData) r.getPayload()).getSegmentsRead())
+                                 .sum()
+      );
+    }
+  }
+
   @Test
   public void 
testCompactionWithSegmentGranularityAndQueryGranularityInGranularitySpec() 
throws Exception
   {
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
new file mode 100644
index 00000000000..8616946ebba
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_parallel.json
@@ -0,0 +1,22 @@
+{
+  "type" : "compact",
+  "dataSource" : "%%DATASOURCE%%",
+  "ioConfig" : {
+    "type": "compact",
+    "inputSpec": {
+      "type": "interval",
+      "interval": "2013-08-31/2013-09-02"
+    }
+  },
+  "tuningConfig": {
+    "type": "index_parallel",
+    "partitionsSpec": {
+      "type": "hashed"
+    },
+    "forceGuaranteedRollup": true,
+    "maxNumConcurrentSubTasks": 3
+  },
+  "context" : {
+    "storeCompactionState" : true
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to