This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 463010bb29c Populate segment stats for non-parallel compaction jobs
(#16171)
463010bb29c is described below
commit 463010bb29cecdaad4203bedccc793171597c7d5
Author: Adithya Chakilam <[email protected]>
AuthorDate: Fri Mar 29 08:40:55 2024 -0500
Populate segment stats for non-parallel compaction jobs (#16171)
* Populate segment stats for non-parallel compaction jobs
* fix
* add-tests
* comments
* update-test
* comments
---
docs/ingestion/tasks.md | 4 ++--
.../druid/indexing/common/task/IndexTask.java | 18 +++++++++++++--
.../druid/indexing/input/DruidInputSource.java | 27 ++++++++++++++++++++--
.../common/task/CompactionTaskRunTest.java | 14 ++++++++++-
.../indexing/common/task/IngestionTestBase.java | 23 ++++++++++++++++++
.../AbstractParallelIndexSupervisorTaskTest.java | 24 -------------------
6 files changed, 79 insertions(+), 31 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 4b6153fa26a..ab206c75762 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -160,8 +160,8 @@ For some task types, the indexing task can wait for the
newly ingested segments
|Field|Description|
|---|---|
-|`segmentsRead`|Number of segments read by compaction task with more than 1
subtask.|
-|`segmentsPublished`|Number of segments published by compaction task with more
than 1 subtask.|
+|`segmentsRead`|Number of segments read by compaction task.|
+|`segmentsPublished`|Number of segments published by compaction task.|
### Live report
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 49041ce4d70..5c538d4a0fc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -61,6 +61,7 @@ import
org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAn
import
org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import
org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis;
import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis;
+import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.input.TaskInputSource;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.java.util.common.IAE;
@@ -540,12 +541,18 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
private void updateAndWriteCompletionReports(TaskToolbox toolbox)
{
- completionReports = buildIngestionStatsReport(ingestionState, errorMsg,
null, null);
+ updateAndWriteCompletionReports(toolbox, null, null);
+ }
+
+ private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long
segmentsRead, Long segmentsPublished)
+ {
+ completionReports = buildIngestionStatsReport(ingestionState, errorMsg,
segmentsRead, segmentsPublished);
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}
+
@Override
protected Map<String, Object> getTaskCompletionUnparseableEvents()
{
@@ -1004,7 +1011,14 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
log.debugSegments(published.getSegments(), "Published segments");
- updateAndWriteCompletionReports(toolbox);
+ updateAndWriteCompletionReports(
+ toolbox,
+ // only applicable to the compaction use cases
+ inputSource instanceof DruidInputSource
+ ? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead()
+ : null,
+ (long) published.getSegments().size()
+ );
return TaskStatus.success(getId());
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 890a7c313fa..dd1998645b3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -78,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -155,6 +157,8 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
@Nullable
private final TaskToolbox toolbox;
+ @Nullable
+ private Integer numSegmentsInTimeline;
@JsonCreator
public DruidInputSource(
@@ -362,11 +366,21 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
{
+ List<TimelineObjectHolder<String, DataSegment>> timeline;
if (interval == null) {
- return getTimelineForSegmentIds(coordinatorClient, dataSource,
segmentIds);
+ timeline = getTimelineForSegmentIds(coordinatorClient, dataSource,
segmentIds);
} else {
- return getTimelineForInterval(toolbox, coordinatorClient, dataSource,
interval);
+ timeline = getTimelineForInterval(toolbox, coordinatorClient,
dataSource, interval);
+ }
+
+ Set<SegmentId> ids = new HashSet<>();
+ for (TimelineObjectHolder<String, DataSegment> holder : timeline) {
+ for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+ ids.add(chunk.getObject().getId());
+ }
}
+ numSegmentsInTimeline = ids.size();
+ return timeline;
}
@Override
@@ -620,4 +634,13 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
return new ArrayList<>(timeline.values());
}
+
+ /**
+ * @return Number of segments read by this input source. This value is null
until
+ * the method {@link #fixedFormatReader} has been invoked on this
input source.
+ */
+ public int getNumberOfSegmentsRead()
+ {
+ return numSegmentsInTimeline;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 6f6b3e8ef48..7c14bc96dff 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -42,10 +42,12 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
@@ -318,6 +320,16 @@ public class CompactionTaskRunTest extends
IngestionTestBase
List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments);
Assert.assertEquals(TEST_ROWS, rowsFromSegment);
+
+ List<IngestionStatsAndErrors> reports = getIngestionReports();
+ Assert.assertEquals(
+ 3L,
+
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsPublished).sum()
+ );
+ Assert.assertEquals(
+ 6L,
+
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum()
+ );
}
@Test
@@ -2019,7 +2031,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
.taskWorkDir(temporaryFolder.newFolder())
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
true)))
- .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+ .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index b82dafccc6b..2fe8790fb9c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
@@ -36,8 +37,11 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.RegexParseSpec;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
@@ -97,6 +101,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
@@ -114,6 +119,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
+ protected File reportsFile;
@Before
public void setUpIngestionTestBase() throws IOException
@@ -139,6 +145,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new
SegmentCacheManagerFactory(getObjectMapper());
+ reportsFile = temporaryFolder.newFile();
}
@After
@@ -502,4 +509,20 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
throw new UnsupportedOperationException();
}
}
+
+ public Map<String, TaskReport> getReports() throws IOException
+ {
+ return objectMapper.readValue(reportsFile, new TypeReference<Map<String,
TaskReport>>()
+ {
+ });
+ }
+
+ public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
+ {
+ return getReports().entrySet()
+ .stream()
+ .filter(entry ->
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+ .map(entry -> (IngestionStatsAndErrors)
entry.getValue().getPayload())
+ .collect(Collectors.toList());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 08b0c584f76..a6a9f6ccdd7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -50,13 +49,10 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrors;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
-import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -65,7 +61,6 @@ import
org.apache.druid.indexing.common.config.TaskConfigBuilder;
import
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
-import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@@ -236,7 +231,6 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
private CoordinatorClient coordinatorClient;
// An executor that executes API calls using a different thread from the
caller thread as if they were remote calls.
private ExecutorService remoteApiExecutor;
- private File reportsFile;
protected AbstractParallelIndexSupervisorTaskTest(
double transientTaskFailureRate,
@@ -262,7 +256,6 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
prepareObjectMapper(objectMapper, getIndexIO());
- reportsFile = temporaryFolder.newFile();
}
@After
@@ -701,7 +694,6 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
.taskWorkDir(temporaryFolder.newFolder(task.getId()))
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
true)))
- .taskReportFileWriter(new NoopTestTaskReportFileWriter())
.intermediaryDataManager(intermediaryDataManager)
.taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
@@ -1066,20 +1058,4 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
throw new ISE("Can't find segment for id[%s]", segmentId);
}
}
-
- public Map<String, TaskReport> getReports() throws IOException
- {
- return objectMapper.readValue(reportsFile, new TypeReference<Map<String,
TaskReport>>()
- {
- });
- }
-
- public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
- {
- return getReports().entrySet()
- .stream()
- .filter(entry ->
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
- .map(entry -> (IngestionStatsAndErrors)
entry.getValue().getPayload())
- .collect(Collectors.toList());
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]