This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d307df6e96 Fix metric emission in the segment generation phase 
(#16146)
7d307df6e96 is described below

commit 7d307df6e96bc44b6675ef84986f4b44843f4581
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Mon Mar 18 14:38:18 2024 +0530

    Fix metric emission in the segment generation phase (#16146)
    
    Fix metric emission in the segment generation phase
---
 .../task/batch/parallel/PartialSegmentGenerateTask.java       | 11 ++++++-----
 .../common/task/batch/parallel/SinglePhaseSubTask.java        | 11 ++++++-----
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 1cb3d36ad75..46be219d878 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -27,8 +27,10 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.InputSourceProcessor;
@@ -40,7 +42,6 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskIn
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -48,7 +49,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
@@ -179,9 +179,10 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
     final FireDepartmentMetrics fireDepartmentMetrics = 
fireDepartmentForMetrics.getMetrics();
     buildSegmentsMeters = 
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
 
-    RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
-        Collections.singletonList(fireDepartmentForMetrics),
-        Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
+    TaskRealtimeMetricsMonitor metricsMonitor = 
TaskRealtimeMetricsMonitorBuilder.build(
+        this,
+        fireDepartmentForMetrics,
+        buildSegmentsMeters
     );
     toolbox.addMonitor(metricsMonitor);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index bbd3f2964b6..9e0c8d80c83 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -36,10 +36,12 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
@@ -53,7 +55,6 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -63,7 +64,6 @@ import 
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
@@ -373,9 +373,10 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
         new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
     final FireDepartmentMetrics fireDepartmentMetrics = 
fireDepartmentForMetrics.getMetrics();
 
-    RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
-        Collections.singletonList(fireDepartmentForMetrics),
-        Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
+    TaskRealtimeMetricsMonitor metricsMonitor = 
TaskRealtimeMetricsMonitorBuilder.build(
+        this,
+        fireDepartmentForMetrics,
+        rowIngestionMeters
     );
     toolbox.addMonitor(metricsMonitor);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to