This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 463010bb29c Populate segment stats for non-parallel compaction jobs 
(#16171)
463010bb29c is described below

commit 463010bb29cecdaad4203bedccc793171597c7d5
Author: Adithya Chakilam <35785271+adithyachaki...@users.noreply.github.com>
AuthorDate: Fri Mar 29 08:40:55 2024 -0500

    Populate segment stats for non-parallel compaction jobs (#16171)
    
    * Populate segment stats for non-parallel compaction jobs
    
    * fix
    
    * add-tests
    
    * comments
    
    * update-test
    
    * comments
---
 docs/ingestion/tasks.md                            |  4 ++--
 .../druid/indexing/common/task/IndexTask.java      | 18 +++++++++++++--
 .../druid/indexing/input/DruidInputSource.java     | 27 ++++++++++++++++++++--
 .../common/task/CompactionTaskRunTest.java         | 14 ++++++++++-
 .../indexing/common/task/IngestionTestBase.java    | 23 ++++++++++++++++++
 .../AbstractParallelIndexSupervisorTaskTest.java   | 24 -------------------
 6 files changed, 79 insertions(+), 31 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 4b6153fa26a..ab206c75762 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -160,8 +160,8 @@ For some task types, the indexing task can wait for the 
newly ingested segments
 
 |Field|Description|
 |---|---|
-|`segmentsRead`|Number of segments read by compaction task with more than 1 
subtask.|
-|`segmentsPublished`|Number of segments published by compaction task with more 
than 1 subtask.|
+|`segmentsRead`|Number of segments read by compaction task.|
+|`segmentsPublished`|Number of segments published by compaction task.|
 
 ### Live report
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 49041ce4d70..5c538d4a0fc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -61,6 +61,7 @@ import 
org.apache.druid.indexing.common.task.batch.partition.CompletePartitionAn
 import 
org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
 import 
org.apache.druid.indexing.common.task.batch.partition.LinearPartitionAnalysis;
 import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis;
+import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.indexing.input.TaskInputSource;
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.java.util.common.IAE;
@@ -540,12 +541,18 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
   private void updateAndWriteCompletionReports(TaskToolbox toolbox)
   {
-    completionReports = buildIngestionStatsReport(ingestionState, errorMsg, 
null, null);
+    updateAndWriteCompletionReports(toolbox, null, null);
+  }
+
+  private void updateAndWriteCompletionReports(TaskToolbox toolbox, Long 
segmentsRead, Long segmentsPublished)
+  {
+    completionReports = buildIngestionStatsReport(ingestionState, errorMsg, 
segmentsRead, segmentsPublished);
     if (isStandAloneTask) {
       toolbox.getTaskReportFileWriter().write(getId(), completionReports);
     }
   }
 
+
   @Override
   protected Map<String, Object> getTaskCompletionUnparseableEvents()
   {
@@ -1004,7 +1011,14 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
 
         log.debugSegments(published.getSegments(), "Published segments");
 
-        updateAndWriteCompletionReports(toolbox);
+        updateAndWriteCompletionReports(
+            toolbox,
+            // only applicable to the compaction use cases
+            inputSource instanceof DruidInputSource
+            ? (long) ((DruidInputSource) inputSource).getNumberOfSegmentsRead()
+            : null,
+            (long) published.getSegments().size()
+        );
         return TaskStatus.success(getId());
       }
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 890a7c313fa..dd1998645b3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -78,6 +79,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -155,6 +157,8 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
 
   @Nullable
   private final TaskToolbox toolbox;
+  @Nullable
+  private Integer numSegmentsInTimeline;
 
   @JsonCreator
   public DruidInputSource(
@@ -362,11 +366,21 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
 
   private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
   {
+    List<TimelineObjectHolder<String, DataSegment>> timeline;
     if (interval == null) {
-      return getTimelineForSegmentIds(coordinatorClient, dataSource, 
segmentIds);
+      timeline = getTimelineForSegmentIds(coordinatorClient, dataSource, 
segmentIds);
     } else {
-      return getTimelineForInterval(toolbox, coordinatorClient, dataSource, 
interval);
+      timeline = getTimelineForInterval(toolbox, coordinatorClient, 
dataSource, interval);
+    }
+
+    Set<SegmentId> ids = new HashSet<>();
+    for (TimelineObjectHolder<String, DataSegment> holder : timeline) {
+      for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+        ids.add(chunk.getObject().getId());
+      }
     }
+    numSegmentsInTimeline = ids.size();
+    return timeline;
   }
 
   @Override
@@ -620,4 +634,13 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
 
     return new ArrayList<>(timeline.values());
   }
+
+  /**
+   * @return Number of segments read by this input source. This value is null 
until
+   *         the method {@link #fixedFormatReader} has been invoked on this 
input source.
+   */
+  public int getNumberOfSegmentsRead()
+  {
+    return numSegmentsInTimeline;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 6f6b3e8ef48..7c14bc96dff 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -42,10 +42,12 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -318,6 +320,16 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments);
     Assert.assertEquals(TEST_ROWS, rowsFromSegment);
+
+    List<IngestionStatsAndErrors> reports = getIngestionReports();
+    Assert.assertEquals(
+        3L,
+        
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsPublished).sum()
+    );
+    Assert.assertEquals(
+        6L,
+        
reports.stream().mapToLong(IngestionStatsAndErrors::getSegmentsRead).sum()
+    );
   }
 
   @Test
@@ -2019,7 +2031,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
         .taskWorkDir(temporaryFolder.newFolder())
         .indexIO(getIndexIO())
         
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
 true)))
-        .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+        .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
         .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
         .chatHandlerProvider(new NoopChatHandlerProvider())
         .rowIngestionMetersFactory(testUtils.getRowIngestionMetersFactory())
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index b82dafccc6b..2fe8790fb9c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
@@ -36,8 +37,11 @@ import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.RegexInputFormat;
 import org.apache.druid.data.input.impl.RegexParseSpec;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IngestionStatsAndErrors;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.SegmentInsertAction;
@@ -97,6 +101,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 public abstract class IngestionTestBase extends InitializedNullHandlingTest
 {
@@ -114,6 +119,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
   private SegmentsMetadataManager segmentsMetadataManager;
   private TaskLockbox lockbox;
   private File baseDir;
+  protected File reportsFile;
 
   @Before
   public void setUpIngestionTestBase() throws IOException
@@ -139,6 +145,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
     );
     lockbox = new TaskLockbox(taskStorage, storageCoordinator);
     segmentCacheManagerFactory = new 
SegmentCacheManagerFactory(getObjectMapper());
+    reportsFile = temporaryFolder.newFile();
   }
 
   @After
@@ -502,4 +509,20 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
       throw new UnsupportedOperationException();
     }
   }
+
+  public Map<String, TaskReport> getReports() throws IOException
+  {
+    return objectMapper.readValue(reportsFile, new TypeReference<Map<String, 
TaskReport>>()
+    {
+    });
+  }
+
+  public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
+  {
+    return getReports().entrySet()
+                       .stream()
+                       .filter(entry -> 
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+                       .map(entry -> (IngestionStatsAndErrors) 
entry.getValue().getPayload())
+                       .collect(Collectors.toList());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 08b0c584f76..a6a9f6ccdd7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
@@ -50,13 +49,10 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexing.common.IngestionStatsAndErrors;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
-import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -65,7 +61,6 @@ import 
org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
-import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
@@ -236,7 +231,6 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
   private CoordinatorClient coordinatorClient;
   // An executor that executes API calls using a different thread from the 
caller thread as if they were remote calls.
   private ExecutorService remoteApiExecutor;
-  private File reportsFile;
 
   protected AbstractParallelIndexSupervisorTaskTest(
       double transientTaskFailureRate,
@@ -262,7 +256,6 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
     remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
     coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
     prepareObjectMapper(objectMapper, getIndexIO());
-    reportsFile = temporaryFolder.newFile();
   }
 
   @After
@@ -701,7 +694,6 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
         .taskWorkDir(temporaryFolder.newFolder(task.getId()))
         .indexIO(getIndexIO())
         
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY,
 true)))
-        .taskReportFileWriter(new NoopTestTaskReportFileWriter())
         .intermediaryDataManager(intermediaryDataManager)
         .taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
         .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
@@ -1066,20 +1058,4 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
       throw new ISE("Can't find segment for id[%s]", segmentId);
     }
   }
-
-  public Map<String, TaskReport> getReports() throws IOException
-  {
-    return objectMapper.readValue(reportsFile, new TypeReference<Map<String, 
TaskReport>>()
-    {
-    });
-  }
-
-  public List<IngestionStatsAndErrors> getIngestionReports() throws IOException
-  {
-    return getReports().entrySet()
-                       .stream()
-                       .filter(entry -> 
entry.getKey().contains(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
-                       .map(entry -> (IngestionStatsAndErrors) 
entry.getValue().getPayload())
-                       .collect(Collectors.toList());
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to