This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7301e60a9c5 Add metrics for number of segments generated per task in
MSQ (#14980)
7301e60a9c5 is described below
commit 7301e60a9c506758d719cbb57426abcbf912b4b9
Author: YongGang <[email protected]>
AuthorDate: Mon Sep 25 14:16:33 2023 -0700
Add metrics for number of segments generated per task in MSQ (#14980)
Add ingest/tombstones/count and ingest/segments/count metrics in MSQ.
---
.../src/main/resources/defaultMetricDimensions.json | 1 +
.../main/java/org/apache/druid/msq/exec/ControllerContext.java | 3 +++
.../src/main/java/org/apache/druid/msq/exec/ControllerImpl.java | 6 ++++++
.../org/apache/druid/msq/indexing/IndexerControllerContext.java | 7 +++++++
.../java/org/apache/druid/msq/test/MSQTestControllerContext.java | 9 +++++++++
5 files changed, 26 insertions(+)
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index b5c5f8b1ce4..2e4aca39ab5 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -52,6 +52,7 @@
"ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count"
},
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
+ "ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count"
},
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" :
"gauge" },
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index e09ac9ebd6c..0a32158cf9b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import java.util.Map;
@@ -36,6 +37,8 @@ import java.util.Map;
*/
public interface ControllerContext
{
+ ServiceEmitter emitter();
+
ObjectMapper jsonMapper();
/**
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f81b1e8a827..5cdd7b0ffa6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1331,6 +1331,7 @@ public class ControllerImpl implements Controller
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
+ int numTombstones = 0;
if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop =
findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
@@ -1345,6 +1346,7 @@ public class ControllerImpl implements Controller
destination.getSegmentGranularity()
);
segmentsWithTombstones.addAll(tombstones);
+ numTombstones = tombstones.size();
}
catch (IllegalStateException e) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
@@ -1392,6 +1394,10 @@ public class ControllerImpl implements Controller
SegmentTransactionalInsertAction.appendAction(segments, null, null)
);
}
+
+ task.emitMetric(context.emitter(), "ingest/tombstones/count",
numTombstones);
+ // Include tombstones in the reported segments count
+ task.emitMetric(context.emitter(), "ingest/segments/count",
segmentsWithTombstones.size());
}
/**
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 5d17b005b94..401d6af7072 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.WorkerClient;
@@ -67,6 +68,12 @@ public class IndexerControllerContext implements
ControllerContext
this.workerManager = new IndexerWorkerManagerClient(overlordClient);
}
+ @Override
+ public ServiceEmitter emitter()
+ {
+ return toolbox.getEmitter();
+ }
+
@Override
public ObjectMapper jsonMapper()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 2ee2207fd83..027d2a913b2 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.Worker;
@@ -48,6 +49,7 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -82,6 +84,7 @@ public class MSQTestControllerContext implements
ControllerContext
);
private final Injector injector;
private final ObjectMapper mapper;
+ private final ServiceEmitter emitter = new NoopServiceEmitter();
private Controller controller;
private Map<String, TaskReport> report = null;
@@ -215,6 +218,12 @@ public class MSQTestControllerContext implements
ControllerContext
}
};
+ @Override
+ public ServiceEmitter emitter()
+ {
+ return emitter;
+ }
+
@Override
public ObjectMapper jsonMapper()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]