This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7d307df6e96 Fix metric emission in the segment generation phase
(#16146)
7d307df6e96 is described below
commit 7d307df6e96bc44b6675ef84986f4b44843f4581
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Mon Mar 18 14:38:18 2024 +0530
Fix metric emission in the segment generation phase (#16146)
Fix metric emission in the segment generation phase
---
.../task/batch/parallel/PartialSegmentGenerateTask.java | 11 ++++++-----
.../common/task/batch/parallel/SinglePhaseSubTask.java | 11 ++++++-----
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 1cb3d36ad75..46be219d878 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -27,8 +27,10 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.InputSourceProcessor;
@@ -40,7 +42,6 @@ import
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskIn
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
-import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -48,7 +49,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
@@ -179,9 +179,10 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
final FireDepartmentMetrics fireDepartmentMetrics =
fireDepartmentForMetrics.getMetrics();
buildSegmentsMeters =
toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
- RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
- Collections.singletonList(fireDepartmentForMetrics),
- Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
+ TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(
+ this,
+ fireDepartmentForMetrics,
+ buildSegmentsMeters
);
toolbox.addMonitor(metricsMonitor);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index bbd3f2964b6..9e0c8d80c83 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -36,10 +36,12 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
@@ -53,7 +55,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -63,7 +64,6 @@ import
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
@@ -373,9 +373,10 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
final FireDepartmentMetrics fireDepartmentMetrics =
fireDepartmentForMetrics.getMetrics();
- RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
- Collections.singletonList(fireDepartmentForMetrics),
- Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
+ TaskRealtimeMetricsMonitor metricsMonitor =
TaskRealtimeMetricsMonitorBuilder.build(
+ this,
+ fireDepartmentForMetrics,
+ rowIngestionMeters
);
toolbox.addMonitor(metricsMonitor);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]