This is an automated email from the ASF dual-hosted git repository. georgew5656 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 463010bb29c Populate segment stats for non-parallel compaction jobs (#16171) 463010bb29c is described below commit 463010bb29cecdaad4203bedccc793171597c7d5 Author: Adithya Chakilam <35785271+adithyachaki...@users.noreply.github.com> AuthorDate: Fri Mar 29 08:40:55 2024 -0500 Populate segment stats for non-parallel compaction jobs (#16171) * Populate segment stats for non-parallel compaction jobs * fix * add-tests * comments * update-test * comments --- docs/ingestion/tasks.md | 4 ++-- .../druid/indexing/common/task/IndexTask.java | 18 +++++++++++++-- .../druid/indexing/input/DruidInputSource.java | 27 ++++++++++++++++++++-- .../common/task/CompactionTaskRunTest.java | 14 ++++++++++- .../indexing/common/task/IngestionTestBase.java | 23 ++++++++++++++++++ .../AbstractParallelIndexSupervisorTaskTest.java | 24 ------------------- 6 files changed, 79 insertions(+), 31 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 4b6153fa26a..ab206c75762 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -160,8 +160,8 @@ For some task types, the indexing task can wait for the newly ingested segments |Field|Description| |---|---| -|`segmentsRead`|Number of segments read by compaction task with more than 1 subtask.| -|`segmentsPublished`|Number of segments published by compaction task with more than 1 subtask.| +|`segmentsRead`|Number of segments read by compaction task.| +|`segmentsPublished`|Number of segments published by compaction task.| ### Live report diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 49041ce4d70..5c538d4a0fc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAn import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis; import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis; +import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.input.TaskInputSource; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.java.util.common.IAE; @@ -540,12 +541,18 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler private void updateAndWriteCompletionReports(TaskToolbox toolbox) { - completionReports = buildIngestionStatsReport(ingestionState, errorMsg, null, null); + updateAndWriteCompletionReports(toolbox, null, null); + } + + private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long segmentsRead, Long segmentsPublished) + { + completionReports = buildIngestionStatsReport(ingestionState, errorMsg, segmentsRead, segmentsPublished); if (isStandAloneTask) { toolbox.getTaskReportFileWriter().write(getId(), completionReports); } } + @Override protected Map<String, Object> getTaskCompletionUnparseableEvents() { @@ -1004,7 +1011,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler log.debugSegments(published.getSegments(), "Published segments"); - updateAndWriteCompletionReports(toolbox); + updateAndWriteCompletionReports( + toolbox, + // only applicable to the compaction use cases + inputSource instanceof DruidInputSource + ? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead() + : null, + (long) published.getSegments().size() + ); return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 890a7c313fa..dd1998645b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.partition.PartitionChunk; @@ -78,6 +79,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -155,6 +157,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI @Nullable private final TaskToolbox toolbox; + @Nullable + private Integer numSegmentsInTimeline; @JsonCreator public DruidInputSource( @@ -362,11 +366,21 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI private List<TimelineObjectHolder<String, DataSegment>> createTimeline() { + List<TimelineObjectHolder<String, DataSegment>> timeline; if (interval == null) { - return getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); + timeline = getTimelineForSegmentIds(coordinatorClient, dataSource, segmentIds); } else { - return getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval); + timeline = getTimelineForInterval(toolbox, coordinatorClient, dataSource, interval); + } + + Set<SegmentId> ids = new HashSet<>(); + for (TimelineObjectHolder<String, DataSegment> holder : timeline) { + for (PartitionChunk<DataSegment> chunk : holder.getObject()) { + ids.add(chunk.getObject().getId()); + } } + numSegmentsInTimeline = ids.size(); + return timeline; } @Override @@ -620,4 +634,13 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI return new ArrayList<>(timeline.values()); } + + /** + * @return Number of segments read by this input source. This value is null until + * the method {@link #fixedFormatReader} has been invoked on this input source. + */ + public int getNumberOfSegmentsRead() + { + return numSegmentsInTimeline; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 6f6b3e8ef48..7c14bc96dff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -42,10 +42,12 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; @@ -318,6 +320,16 @@ public class CompactionTaskRunTest extends IngestionTestBase List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments); Assert.assertEquals(TEST_ROWS, rowsFromSegment); + + List<IngestionStatsAndErrors> reports = getIngestionReports(); + Assert.assertEquals( + 3L, + reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsPublished).sum() + ); + Assert.assertEquals( + 6L, + reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum() + ); } @Test @@ -2019,7 +2031,7 @@ public class CompactionTaskRunTest extends IngestionTestBase .taskWorkDir(temporaryFolder.newFolder()) .indexIO(getIndexIO()) .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) - .taskReportFileWriter(new NoopTestTaskReportFileWriter()) + .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile)) .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) .chatHandlerProvider(new NoopChatHandlerProvider()) .rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory()) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index b82dafccc6b..2fe8790fb9c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; @@ -36,8 +37,11 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.RegexParseSpec; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.IngestionStatsAndErrors; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.SegmentInsertAction; @@ -97,6 +101,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; public abstract class IngestionTestBase extends InitializedNullHandlingTest { @@ -114,6 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private SegmentsMetadataManager segmentsMetadataManager; private TaskLockbox lockbox; private File baseDir; + protected File reportsFile; @Before public void setUpIngestionTestBase() throws IOException @@ -139,6 +145,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest ); lockbox = new TaskLockbox(taskStorage, storageCoordinator); segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper()); + reportsFile = temporaryFolder.newFile(); } @After @@ -502,4 +509,20 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest throw new UnsupportedOperationException(); } } + + public Map<String, TaskReport> getReports() throws IOException + { + return objectMapper.readValue(reportsFile, new TypeReference<Map<String, TaskReport>>() + { + }); + } + + public List<IngestionStatsAndErrors> getIngestionReports() throws IOException + { + return getReports().entrySet() + .stream() + .filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY)) + .map(entry -> (IngestionStatsAndErrors) entry.getValue().getPayload()) + .collect(Collectors.toList()); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 08b0c584f76..a6a9f6ccdd7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; @@ -50,13 +49,10 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexing.common.IngestionStatsAndErrors; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; -import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -65,7 +61,6 @@ import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.IngestionTestBase; -import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; @@ -236,7 +231,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase private CoordinatorClient coordinatorClient; // An executor that executes API calls using a different thread from the caller thread as if they were remote calls. private ExecutorService remoteApiExecutor; - private File reportsFile; protected AbstractParallelIndexSupervisorTaskTest( double transientTaskFailureRate, @@ -262,7 +256,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor"); coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor); prepareObjectMapper(objectMapper, getIndexIO()); - reportsFile = temporaryFolder.newFile(); } @After @@ -701,7 +694,6 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase .taskWorkDir(temporaryFolder.newFolder(task.getId())) .indexIO(getIndexIO()) .indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true))) - .taskReportFileWriter(new NoopTestTaskReportFileWriter()) .intermediaryDataManager(intermediaryDataManager) .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile)) .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER) @@ -1066,20 +1058,4 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase throw new ISE("Can't find segment for id[%s]", segmentId); } } - - public Map<String, TaskReport> getReports() throws IOException - { - return objectMapper.readValue(reportsFile, new TypeReference<Map<String, TaskReport>>() - { - }); - } - - public List<IngestionStatsAndErrors> getIngestionReports() throws IOException - { - return getReports().entrySet() - .stream() - .filter(entry -> entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY)) - .map(entry -> (IngestionStatsAndErrors) entry.getValue().getPayload()) - .collect(Collectors.toList()); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org