This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7301e60a9c5 Add metrics for number of segments generated per task in 
MSQ (#14980)
7301e60a9c5 is described below

commit 7301e60a9c506758d719cbb57426abcbf912b4b9
Author: YongGang <[email protected]>
AuthorDate: Mon Sep 25 14:16:33 2023 -0700

    Add metrics for number of segments generated per task in MSQ (#14980)
    
    Add ingest/tombstones/count and ingest/segments/count metrics in MSQ.
---
 .../src/main/resources/defaultMetricDimensions.json              | 1 +
 .../main/java/org/apache/druid/msq/exec/ControllerContext.java   | 3 +++
 .../src/main/java/org/apache/druid/msq/exec/ControllerImpl.java  | 6 ++++++
 .../org/apache/druid/msq/indexing/IndexerControllerContext.java  | 7 +++++++
 .../java/org/apache/druid/msq/test/MSQTestControllerContext.java | 9 +++++++++
 5 files changed, 26 insertions(+)

diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index b5c5f8b1ce4..2e4aca39ab5 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -52,6 +52,7 @@
   "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" 
},
   "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
   "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
+  "ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" 
},
 
   "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
   "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : 
"gauge" },
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index e09ac9ebd6c..0a32158cf9b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 
 import java.util.Map;
@@ -36,6 +37,8 @@ import java.util.Map;
  */
 public interface ControllerContext
 {
+  ServiceEmitter emitter();
+
   ObjectMapper jsonMapper();
 
   /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f81b1e8a827..5cdd7b0ffa6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1331,6 +1331,7 @@ public class ControllerImpl implements Controller
     final DataSourceMSQDestination destination =
         (DataSourceMSQDestination) task.getQuerySpec().getDestination();
     final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
+    int numTombstones = 0;
 
     if (destination.isReplaceTimeChunks()) {
       final List<Interval> intervalsToDrop = 
findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
@@ -1345,6 +1346,7 @@ public class ControllerImpl implements Controller
               destination.getSegmentGranularity()
           );
           segmentsWithTombstones.addAll(tombstones);
+          numTombstones = tombstones.size();
         }
         catch (IllegalStateException e) {
           throw new MSQException(e, InsertLockPreemptedFault.instance());
@@ -1392,6 +1394,10 @@ public class ControllerImpl implements Controller
           SegmentTransactionalInsertAction.appendAction(segments, null, null)
       );
     }
+
+    task.emitMetric(context.emitter(), "ingest/tombstones/count", 
numTombstones);
+    // Include tombstones in the reported segments count
+    task.emitMetric(context.emitter(), "ingest/segments/count", 
segmentsWithTombstones.size());
   }
 
   /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 5d17b005b94..401d6af7072 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.WorkerClient;
@@ -67,6 +68,12 @@ public class IndexerControllerContext implements 
ControllerContext
     this.workerManager = new IndexerWorkerManagerClient(overlordClient);
   }
 
+  @Override
+  public ServiceEmitter emitter()
+  {
+    return toolbox.getEmitter();
+  }
+
   @Override
   public ObjectMapper jsonMapper()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 2ee2207fd83..027d2a913b2 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.Worker;
@@ -48,6 +49,7 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
@@ -82,6 +84,7 @@ public class MSQTestControllerContext implements 
ControllerContext
   );
   private final Injector injector;
   private final ObjectMapper mapper;
+  private final ServiceEmitter emitter = new NoopServiceEmitter();
 
   private Controller controller;
   private Map<String, TaskReport> report = null;
@@ -215,6 +218,12 @@ public class MSQTestControllerContext implements 
ControllerContext
     }
   };
 
+  @Override
+  public ServiceEmitter emitter()
+  {
+    return emitter;
+  }
+
   @Override
   public ObjectMapper jsonMapper()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to