This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 714ac07b52 Allow users to add additional metadata to ingestion metrics 
(#13760)
714ac07b52 is described below

commit 714ac07b524fcdc4d5a81a8f58af91ed03c78be3
Author: Suneet Saldanha <[email protected]>
AuthorDate: Wed Feb 8 18:07:23 2023 -0800

    Allow users to add additional metadata to ingestion metrics (#13760)
    
    * Allow users to add additional metadata to ingestion metrics
    
    When submitting an ingestion spec, users may pass a map of metadata
    in the ingestion spec config that will be added to ingestion metrics.
    
    This will make it possible for operators to tag metrics with other
    metadata that doesn't necessarily line up with the existing tags
    like taskId.
    
    Druid clusters that ingest these metrics can take advantage of the
    nested data columns feature to process this additional metadata.
    
    * rename to tags
    
    * docs
    
    * tests
    
    * fix test
    
    * make code cov happy
    
    * checkstyle
---
 .../util/emitter/service/ServiceMetricEvent.java   |   8 ++
 .../emitter/service/ServiceMetricEventTest.java    |  23 ++++
 docs/operations/metrics.md                         |  99 +++++++++---------
 .../common/TaskRealtimeMetricsMonitorBuilder.java  |   9 +-
 .../common/stats/TaskRealtimeMetricsMonitor.java   |   8 +-
 .../common/task/AbstractBatchIndexTask.java        |   2 +
 .../druid/indexing/common/task/IndexTaskUtils.java |  11 +-
 .../supervisor/SeekableStreamSupervisor.java       | 116 ++++++++++++---------
 .../supervisor/SeekableStreamSupervisorSpec.java   |   6 ++
 .../common/TaskRealtimeMetricsMonitorTest.java     |  97 +++++++++++++++++
 .../indexing/common/task/IndexTaskUtilsTest.java   |  82 +++++++++++++++
 .../SeekableStreamSupervisorSpecTest.java          |  97 +++++++++++++++--
 .../SeekableStreamSupervisorStateTest.java         |  15 +++
 .../java/org/apache/druid/query/DruidMetrics.java  |   2 +
 14 files changed, 466 insertions(+), 109 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
 
b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
index f40491ce70..1ba264b4e3 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
@@ -149,6 +149,14 @@ public class ServiceMetricEvent implements Event
       return this;
     }
 
+    public Builder setDimensionIfNotNull(String dim, Object value)
+    {
+      if (value != null) {
+        userDims.put(dim, value);
+      }
+      return this;
+    }
+
     public Builder setDimension(String dim, Object value)
     {
       userDims.put(dim, value);
diff --git 
a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
 
b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
index 5a97f76564..42f299f238 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
@@ -27,8 +27,10 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 /**
+ *
  */
 public class ServiceMetricEventTest
 {
@@ -291,4 +293,25 @@ public class ServiceMetricEventTest
     ServiceMetricEvent.builder().build("foo", 0 / 0f);
   }
 
+  @Test
+  public void testSetDimensionIfNotNullSetsNonNullDimension()
+  {
+    Map<String, String> userDimMap = ImmutableMap.of("k1", "v1", "k2", "v2");
+    ServiceMetricEvent target = ServiceMetricEvent.builder()
+                                                  
.setDimensionIfNotNull("userDimMap", userDimMap)
+                                                  .build("foo", 1)
+                                                  .build("service", "host");
+    Assert.assertEquals(userDimMap, target.getUserDims().get("userDimMap"));
+  }
+
+  @Test
+  public void testSetDimensionIfNotNullShouldNotSetNullDimension()
+  {
+    ServiceMetricEvent target = ServiceMetricEvent.builder()
+                                                  
.setDimensionIfNotNull("userDimMap", null)
+                                                  .build("foo", 1)
+                                                  .build("service", "host");
+    Assert.assertTrue(target.getUserDims().isEmpty());
+    Assert.assertNull(target.getUserDims().get("userDimMap"));
+  }
 }
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 2d75b23e85..4d4f999b6b 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -152,11 +152,11 @@ If SQL is enabled, the Broker will emit the following 
metrics for SQL.
 
 ## General native ingestion metrics
 
-|Metric|Description|Dimensions|Normal Value|
-|------|-----------|----------|------------|
-|`ingest/count`|Count of `1` every time an ingestion job runs (includes 
compaction jobs). Aggregate using dimensions. |`dataSource`, `taskId`, 
`taskType`, `taskIngestionMode`|Always `1`.|
-|`ingest/segments/count`|Count of final segments created by job (includes 
tombstones). |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|At least 
`1`.|
-|`ingest/tombstones/count`|Count of tombstones created by job. |`dataSource`, 
`taskId`, `taskType`, `taskIngestionMode`|Zero or more for replace. Always zero 
for non-replace tasks (always zero for legacy replace, see below).|
+|Metric|Description| Dimensions                                              
|Normal Value|
+|------|-----------|---------------------------------------------------------|------------|
+|`ingest/count`|Count of `1` every time an ingestion job runs (includes 
compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, 
`taskType`, `taskIngestionMode`, `tags` |Always `1`.|
+|`ingest/segments/count`|Count of final segments created by job (includes 
tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` 
|At least `1`.|
+|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, 
`taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace. 
Always zero for non-replace tasks (always zero for legacy replace, see below).|
 
 The `taskIngestionMode` dimension includes the following modes: 
 * `APPEND`: a native ingestion job appending to existing segments 
@@ -167,12 +167,15 @@ The mode is decided using the values
 of the `isAppendToExisting` and `isDropExisting` flags in the
 task's `IOConfig` as follows:
 
-|`isAppendToExisting` | `isDropExisting` | mode |
-|---------------------|-------------------|------|
-`true` | `false` | `APPEND`|
-`true` | `true  ` | Invalid combination, exception thrown. |
-`false` | `false` | `REPLACE_LEGACY` (this is the default for native batch 
ingestion). |
-`false` | `true` | `REPLACE`|
+| `isAppendToExisting` | `isDropExisting` | mode |
+|----------------------|-------------------|------|
+| `true`               | `false` | `APPEND`|
+| `true`               | `true  ` | Invalid combination, exception thrown. |
+| `false`              | `false` | `REPLACE_LEGACY` (this is the default for 
native batch ingestion). |
+ | `false`              | `true` | `REPLACE`|
+
+The `tags` dimension is reported only for metrics emitted from ingestion tasks 
whose ingest spec specifies the `tags`
+field in the `context` field of the ingestion spec. `tags` is expected to be a 
map of string to object.  
 
 ### Ingestion metrics for Kafka
 
@@ -180,10 +183,10 @@ These metrics apply to the [Kafka indexing 
service](../development/extensions-co
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, 
`stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, 
`stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, 
`stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed 
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum 
emission period for this metric is a minute.|`dataSource`, `stream`, 
`partition`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed 
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum 
emission period for this metric is a minute.|`dataSource`, `stream`, 
`partition`, `tags`|Greater than 0, should not be a very high number. |
 
 ### Ingestion metrics for Kinesis
 
@@ -191,10 +194,10 @@ These metrics apply to the [Kinesis indexing 
service](../development/extensions-
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis 
retention period in milliseconds. |
-|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis 
retention period in milliseconds. |
-|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max 
Kinesis retention period in milliseconds. |
-|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds 
between the current message sequence number consumed by the Kinesis indexing 
tasks and latest sequence number in Kinesis. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `partition`|Greater than 0, up to 
max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
Kinesis retention period in milliseconds. |
+|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
Kinesis retention period in milliseconds. |
+|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up 
to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds 
between the current message sequence number consumed by the Kinesis indexing 
tasks and latest sequence number in Kinesis. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, 
up to max Kinesis retention period in milliseconds. |
 
 ### Other ingestion metrics
 
@@ -203,26 +206,26 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/events/thrownAway`|Number of events rejected because they are either 
null, or filtered by the transform spec, or outside the 
windowPeriod.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/unparseable`|Number of events rejected because the events are 
unparseable.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/duplicate`|Number of events rejected because the events are 
duplicated.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/processed`|Number of events successfully processed per 
emission period.|`dataSource`, `taskId`, `taskType`|Equal to the number of 
events per emission period.|
+|`ingest/events/thrownAway`|Number of events rejected because they are either 
null, or filtered by the transform spec, or outside the 
windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/unparseable`|Number of events rejected because the events are 
unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/duplicate`|Number of events rejected because the events are 
duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/processed`|Number of events successfully processed per 
emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number 
of events per emission period.|
 |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, 
`taskType`|Your number of events with rollup.|
-|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, 
`taskId`, `taskType`|Depends on configuration.|
-|`ingest/persists/time`|Milliseconds spent doing intermediate 
persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally 
a few minutes at most.|
-|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate 
persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally 
a few minutes at most.|
-|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and 
blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`|0 or 
very low|
-|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, 
`taskId`, `taskType`|0|
-|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, 
`taskId`, `taskType`|0|
-|`ingest/merge/time`|Milliseconds spent merging intermediate 
segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. 
Generally a few minutes at most.|
-|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate 
segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. 
Generally a few minutes at most.|
-|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, 
`taskId`, `taskType`|Varies. Generally greater than 0 once every segment 
granular period if cluster operating normally.|
-|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, 
`taskType`|1~3|
-|`ingest/events/messageGap`|Time gap in milliseconds between the latest 
ingested event timestamp and the current system timestamp of metrics emission. 
|`dataSource`, `taskId`, `taskType`|Greater than 0, depends on the time carried 
in event. |
-|`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator.|`dataSource`|Typically 0 and occasionally in lower single digits. 
Should not be a very high number. |
-|`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`dataSource`| < 1s |
-|`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`| < 10 seconds|
-|`ingest/handoff/time`|Total time taken for each set of segments handed 
off.|`dataSource`, `taskId`, `taskType`|Depends on coordinator cycle time.|
+|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, 
`taskId`, `taskType`, `tags`|Depends on configuration.|
+|`ingest/persists/time`|Milliseconds spent doing intermediate 
persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. 
Generally a few minutes at most.|
+|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate 
persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. 
Generally a few minutes at most.|
+|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and 
blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, 
`tags`|0 or very low|
+|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, 
`taskId`, `taskType`, `tags`|0|
+|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, 
`taskId`, `taskType`, `tags`|0|
+|`ingest/merge/time`|Milliseconds spent merging intermediate 
segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. 
Generally a few minutes at most.|
+|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate 
segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. 
Generally a few minutes at most.|
+|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, 
`taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every 
segment granular period if cluster operating normally.|
+|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, 
`taskType`, `tags`|1~3|
+|`ingest/events/messageGap`|Time gap in milliseconds between the latest 
ingested event timestamp and the current system timestamp of metrics emission. 
|`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time 
carried in event. |
+|`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single 
digits. Should not be a very high number. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`dataSource`, `tags`| < 1s |
+|`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
+|`ingest/handoff/time`|Total time taken for each set of segments handed 
off.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator cycle 
time.|
 
 Note: If the JVM does not support CPU time measurement for the current thread, 
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
@@ -230,19 +233,20 @@ Note: If the JVM does not support CPU time measurement 
for the current thread, `
 
 |Metric|Description| Dimensions                                                
 |Normal Value|
 
|------|-----------|------------------------------------------------------------|------------|
-|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, 
`taskType`, `taskStatus`|Varies|
-|`task/pending/time`|Milliseconds taken for a task to wait for running.| 
`dataSource`, `taskId`, `taskType`|Varies|
-|`task/action/log/time`|Milliseconds taken to log a task action to the audit 
log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a 
few seconds, based on action type.|
-|`task/action/success/count`|Number of task actions that were executed 
successfully during the emission period. Currently only being emitted for 
[batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
-|`task/action/failed/count`|Number of task actions that failed during the 
emission period. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, 
`taskType`, `taskStatus`, `tags`|Varies|
+|`task/pending/time`|Milliseconds taken for a task to wait for running.| 
`dataSource`, `taskId`, `taskType`, `tags`|Varies|
+|`task/action/log/time`|Milliseconds taken to log a task action to the audit 
log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000 
(subsecond)|
+|`task/action/run/time`|Milliseconds taken to execute a task action.| 
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from 
subsecond to a few seconds, based on action type.|
+|`task/action/success/count`|Number of task actions that were executed 
successfully during the emission period. Currently only being emitted for 
[batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
+|`task/action/failed/count`|Number of task actions that failed during the 
emission period. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
 |`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions 
in queue. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies based on the 
`batchAllocationWaitTime` and number of batches in queue.|
 |`task/action/batch/runTime`|Milliseconds taken to execute a batch of task 
actions. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few 
seconds, based on action type and batch size.|
 |`task/action/batch/size`|Number of task actions in a batch that was executed 
during the emission period. Currently only being emitted for [batched 
`segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent 
task actions.|
 |`task/action/batch/attempts`|Number of execution attempts for a single batch 
of task actions. Currently only being emitted for [batched `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions).| 
`dataSource`, `taskActionType`, `interval`|1 if there are no failures or 
retries.|
-|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, 
`taskId`, `taskType`, `interval`|Varies|
-|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move 
Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
-|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| 
`dataSource`, `taskId`, `taskType`, `interval`|Varies|
+|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch 
indexing task waited for newly created segments to become available for 
querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`, 
`tags`|Varies|
+|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, 
`taskId`, `taskType`, `interval`, `tags`|Varies|
+|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move 
Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
+|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| 
`dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
 |`task/success/count`|Number of successful tasks per emission period. This 
metric is only available if the TaskCountStatsMonitor module is included.| 
`dataSource`|Varies|
 |`task/failed/count`|Number of failed tasks per emission period. This metric 
is only available if the TaskCountStatsMonitor module is 
included.|`dataSource`|Varies|
 |`task/running/count`|Number of current running tasks. This metric is only 
available if the `TaskCountStatsMonitor` module is 
included.|`dataSource`|Varies|
@@ -253,7 +257,6 @@ Note: If the JVM does not support CPU time measurement for 
the current thread, `
 |`taskSlot/used/count`|Number of busy task slots per emission period. This 
metric is only available if the `TaskSlotCountStatsMonitor` module is 
included.| `category`|Varies|
 |`taskSlot/lazy/count`|Number of total task slots in lazy marked 
MiddleManagers and Indexers per emission period. This metric is only available 
if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
 |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted 
MiddleManagers and Indexers per emission period. This metric is only available 
if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
-|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch 
indexing task waited for newly created segments to become available for 
querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed` 
|Varies|
 |`worker/task/failed/count`|Number of failed tasks run on the reporting worker 
per emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included, and is only supported for 
middleManager nodes.| `category`, `workerVersion`|Varies|
 |`worker/task/success/count`|Number of successful tasks run on the reporting 
worker per emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included, and is only supported for 
middleManager nodes.| `category`,`workerVersion`|Varies|
 |`worker/taskSlot/idle/count`|Number of idle task slots on the reporting 
worker per emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included, and is only supported for 
middleManager nodes.| `category`, `workerVersion`|Varies|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
index 03452ac355..a07ad4eaad 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
@@ -45,7 +45,11 @@ public class TaskRealtimeMetricsMonitorBuilder
     );
   }
 
-  public static TaskRealtimeMetricsMonitor build(Task task, FireDepartment 
fireDepartment, RowIngestionMeters meters)
+  public static TaskRealtimeMetricsMonitor build(
+      Task task,
+      FireDepartment fireDepartment,
+      RowIngestionMeters meters
+  )
   {
     return new TaskRealtimeMetricsMonitor(
         fireDepartment,
@@ -53,7 +57,8 @@ public class TaskRealtimeMetricsMonitorBuilder
         ImmutableMap.of(
             DruidMetrics.TASK_ID, new String[]{task.getId()},
             DruidMetrics.TASK_TYPE, new String[]{task.getType()}
-        )
+            ),
+        task.getContextValue(DruidMetrics.TAGS)
     );
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 91c842a05f..f708bf95d8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -46,6 +47,8 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
   private final FireDepartment fireDepartment;
   private final RowIngestionMeters rowIngestionMeters;
   private final Map<String, String[]> dimensions;
+  @Nullable
+  private final Map<String, Object> metricTags;
 
   private FireDepartmentMetrics previousFireDepartmentMetrics;
   private RowIngestionMetersTotals previousRowIngestionMetersTotals;
@@ -53,12 +56,14 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
   public TaskRealtimeMetricsMonitor(
       FireDepartment fireDepartment,
       RowIngestionMeters rowIngestionMeters,
-      Map<String, String[]> dimensions
+      Map<String, String[]> dimensions,
+      @Nullable Map<String, Object> metricTags
   )
   {
     this.fireDepartment = fireDepartment;
     this.rowIngestionMeters = rowIngestionMeters;
     this.dimensions = ImmutableMap.copyOf(dimensions);
+    this.metricTags = metricTags;
     previousFireDepartmentMetrics = new FireDepartmentMetrics();
     previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 
0, 0);
   }
@@ -80,6 +85,7 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
           thrownAway
       );
     }
+    builder.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags);
     emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
 
     final long unparseable = rowIngestionMetersTotals.getUnparseable()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 6e6e12f2ad..2f19c1a5a0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -60,6 +60,7 @@ import 
org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -691,6 +692,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
               .setDimension("dataSource", getDataSource())
               .setDimension("taskType", getType())
               .setDimension("taskId", getId())
+              .setDimensionIfNotNull(DruidMetrics.TAGS, 
getContextValue(DruidMetrics.TAGS))
               .setDimension("segmentAvailabilityConfirmed", 
segmentAvailabilityConfirmationCompleted)
               .build("task/segmentAvailability/wait/time", 
segmentAvailabilityWaitTimeMs)
       );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 44b838f69a..05be8c1941 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class IndexTaskUtils
 {
@@ -112,6 +113,10 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
     metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
     metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
+    metricBuilder.setDimensionIfNotNull(
+        DruidMetrics.TAGS,
+        task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
+    );
   }
 
   public static void setTaskDimensions(final ServiceMetricEvent.Builder 
metricBuilder, final AbstractTask task)
@@ -119,7 +124,11 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
     metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
     metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
-    metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, 
((AbstractTask) task).getIngestionMode());
+    metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, 
task.getIngestionMode());
+    metricBuilder.setDimensionIfNotNull(
+        DruidMetrics.TAGS,
+        task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
+    );
   }
 
   public static void setTaskStatusDimensions(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 68cf3bf796..0ae9aad963 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -143,7 +143,8 @@ import java.util.stream.Stream;
  * @param <PartitionIdType>    the type of the partition id, for example, 
partitions in Kafka are int type while partitions in Kinesis are String type
  * @param <SequenceOffsetType> the type of the sequence number or offsets, for 
example, Kafka uses long offsets while Kinesis uses String sequence numbers
  */
-public abstract class SeekableStreamSupervisor<PartitionIdType, 
SequenceOffsetType, RecordType extends ByteEntity> implements Supervisor
+public abstract class SeekableStreamSupervisor<PartitionIdType, 
SequenceOffsetType, RecordType extends ByteEntity>
+    implements Supervisor
 {
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
 
@@ -433,27 +434,31 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         try {
           long nowTime = System.currentTimeMillis();
           if (spec.isSuspended()) {
-            log.info("Skipping DynamicAllocationTasksNotice execution because 
[%s] supervisor is suspended",
-                    dataSource
+            log.info(
+                "Skipping DynamicAllocationTasksNotice execution because [%s] 
supervisor is suspended",
+                dataSource
             );
             return;
           }
           log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
-                  dataSource
+                    dataSource
           );
           for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
             if (!list.isEmpty()) {
               log.info(
-                      "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
-                      dataSource, pendingCompletionTaskGroups
+                  "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
+                  dataSource,
+                  pendingCompletionTaskGroups
               );
               return;
             }
           }
           if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
             log.info(
-                    "DynamicAllocationTasksNotice submitted again in [%d] 
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
-                    nowTime - dynamicTriggerLastRunTime, 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                nowTime - dynamicTriggerLastRunTime,
+                autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
+                dataSource
             );
             return;
           }
@@ -479,18 +484,20 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   /**
    * This method determines how to do scale actions based on collected lag 
points.
    * If scale action is triggered :
-   *    First of all, call gracefulShutdownInternal() which will change the 
state of current datasource ingest tasks from reading to publishing.
-   *    Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
-   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * First of all, call gracefulShutdownInternal() which will change the state 
of current datasource ingest tasks from reading to publishing.
+   * Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
+   * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
    * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
+   *
    * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
    * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
-   *         If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
    */
-  private boolean changeTaskCount(int desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException
+  private boolean changeTaskCount(int desiredActiveTaskCount)
+      throws InterruptedException, ExecutionException, TimeoutException
   {
     int currentActiveTaskCount;
     Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
@@ -500,8 +507,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return false;
     } else {
       log.info(
-              "Starting scale action, current active task count is [%d] and 
desired task count is [%d] for dataSource [%s].",
-              currentActiveTaskCount, desiredActiveTaskCount, dataSource
+          "Starting scale action, current active task count is [%d] and 
desired task count is [%d] for dataSource [%s].",
+          currentActiveTaskCount,
+          desiredActiveTaskCount,
+          dataSource
       );
       gracefulShutdownInternal();
       changeTaskCountInIOConfig(desiredActiveTaskCount);
@@ -796,14 +805,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       log.info("Running Task autoscaler for datasource [%s]", dataSource);
 
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
-              ? this.tuningConfig.getWorkerThreads()
-              : Math.min(10, autoScalerConfig.getTaskCountMax()));
+                       ? this.tuningConfig.getWorkerThreads()
+                       : Math.min(10, autoScalerConfig.getTaskCountMax()));
 
       maxNumTasks = autoScalerConfig.getTaskCountMax() * 
this.ioConfig.getReplicas();
     } else {
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
-              ? this.tuningConfig.getWorkerThreads()
-              : Math.min(10, this.ioConfig.getTaskCount()));
+                       ? this.tuningConfig.getWorkerThreads()
+                       : Math.min(10, this.ioConfig.getTaskCount()));
 
       maxNumTasks = this.ioConfig.getTaskCount() * this.ioConfig.getReplicas();
     }
@@ -1246,7 +1255,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Collect row ingestion stats from all tasks managed by this supervisor.
    *
    * @return A map of groupId->taskId->task row stats
-   *
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
@@ -1321,7 +1329,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Collect parse errors from all tasks managed by this supervisor.
    *
    * @return A list of parse error strings
-   *
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
@@ -1975,12 +1982,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * Returns a Pair of information about a task:
-   *
+   * <p>
    * Left-hand side: Status of the task from {@link 
SeekableStreamIndexTaskClient#getStatusAsync}.
-   *
+   * <p>
    * Right-hand side: If status is {@link 
SeekableStreamIndexTaskRunner.Status#PUBLISHING}, end offsets from
    * {@link SeekableStreamIndexTaskClient#getEndOffsetsAsync}. Otherwise, null.
-   *
+   * <p>
    * Used by {@link #discoverTasks()}.
    */
   private ListenableFuture<Pair<SeekableStreamIndexTaskRunner.Status, 
Map<PartitionIdType, SequenceOffsetType>>> getStatusAndPossiblyEndOffsets(
@@ -2049,14 +2056,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   log.info("Resumed task [%s] in first supervisor run.", 
taskId);
                 } else {
                   log.warn("Failed to resume task [%s] in first supervisor 
run.", taskId);
-                  killTask(taskId,
-                           "Killing forcefully as task could not be resumed in 
the first supervisor run after Overlord change.");
+                  killTask(
+                      taskId,
+                      "Killing forcefully as task could not be resumed in the 
first supervisor run after Overlord change."
+                  );
                 }
               }
               catch (Exception e) {
                 log.warn(e, "Failed to resume task [%s] in first supervisor 
run.", taskId);
-                killTask(taskId,
-                         "Killing forcefully as task could not be resumed in 
the first supervisor run after Overlord change.");
+                killTask(
+                    taskId,
+                    "Killing forcefully as task could not be resumed in the 
first supervisor run after Overlord change."
+                );
               }
             }
           },
@@ -2341,8 +2352,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Determines whether a given task was created by the current version of the 
supervisor.
    * Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
    * If not found in the map, fetch it from the metadata store.
-   * @param taskGroupId task group id
-   * @param taskId task id
+   *
+   * @param taskGroupId   task group id
+   * @param taskId        task id
    * @param activeTaskMap Set of active tasks that were pre-fetched
    * @return true if the task was created by the current supervisor
    */
@@ -2628,8 +2640,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     recordSupplierLock.lock();
     try {
       final Set<StreamPartition<PartitionIdType>> partitions = 
partitionIds.stream()
-                                         .map(partitionId -> new 
StreamPartition<>(ioConfig.getStream(), partitionId))
-                                         .collect(Collectors.toSet());
+                                                                           
.map(partitionId -> new StreamPartition<>(
+                                                                               
ioConfig.getStream(),
+                                                                               
partitionId
+                                                                           ))
+                                                                           
.collect(Collectors.toSet());
       if (!recordSupplier.getAssignment().containsAll(partitions)) {
         recordSupplier.assign(partitions);
         try {
@@ -2740,7 +2755,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * by this method.
    *
    * @param availablePartitions
-   *
    * @return a remapped copy of partitionGroups, containing only the 
partitions in availablePartitions
    */
   protected Map<Integer, Set<PartitionIdType>> 
recomputePartitionGroupsForExpiration(
@@ -2757,7 +2771,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param currentMetadata     The current DataSourceMetadata from metadata 
storage
    * @param expiredPartitionIds The set of expired partition IDs.
-   *
    * @return currentMetadata but with any expired partitions removed.
    */
   protected SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@@ -3396,7 +3409,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * should be removed from the starting offsets sent to the tasks.
    *
    * @param startingOffsets
-   *
    * @return startingOffsets with entries for expired partitions removed
    */
   protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
filterExpiredPartitionsFromStartingOffsets(
@@ -3821,7 +3833,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 pendingCompletionTaskGroups
                     .values()
                     .stream()
-                    .flatMap(taskGroups -> 
taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
+                    .flatMap(taskGroups -> taskGroups.stream()
+                                                     .flatMap(taskGroup -> 
taskGroup.tasks.entrySet().stream()))
                     .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream())
             ).collect(Collectors.toMap(
                 Entry::getKey,
@@ -3829,7 +3842,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 (v1, v2) -> 
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
             ));
 
-        partitionIds.forEach(partitionId -> 
currentOffsets.putIfAbsent(partitionId, 
offsetsFromMetadataStorage.get(partitionId)));
+        partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(
+            partitionId,
+            offsetsFromMetadataStorage.get(partitionId)
+        ));
         return currentOffsets;
       }
     }
@@ -3935,6 +3951,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * Get all active tasks from metadata storage
+   *
    * @return map from taskId to Task
    */
   private Map<String, Task> getActiveTaskMap()
@@ -3969,7 +3986,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * the given replicas count
    *
    * @return list of specific kafka/kinesis index taksks
-   *
    * @throws JsonProcessingException
    */
   protected abstract List<SeekableStreamIndexTask<PartitionIdType, 
SequenceOffsetType, RecordType>> createIndexTasks(
@@ -3987,7 +4003,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * different between Kafka/Kinesis since Kinesis uses String as partition id
    *
    * @param partition partition id
-   *
    * @return taskgroup id
    */
   protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@@ -3997,7 +4012,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * of [kafka/kinesis]DataSourceMetadata
    *
    * @param metadata datasource metadata
-   *
    * @return true if isInstance else false
    */
   protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata 
metadata);
@@ -4007,7 +4021,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * [Kafka/Kinesis]IndexTask
    *
    * @param task task
-   *
    * @return true if isInstance else false
    */
   protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@@ -4017,7 +4030,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param stream stream name
    * @param map    partitionId -> sequence
-   *
    * @return specific instance of datasource metadata
    */
   protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> createDataSourceMetaDataForReset(
@@ -4152,9 +4164,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     try {
       emitter.emit(
           ServiceMetricEvent.builder()
-              .setDimension("noticeType", noticeType)
-              .setDimension("dataSource", dataSource)
-              .build("ingest/notices/time", timeInMillis)
+                            .setDimension("noticeType", noticeType)
+                            .setDimension("dataSource", dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
+                            .build("ingest/notices/time", timeInMillis)
       );
     }
     catch (Exception e) {
@@ -4171,8 +4184,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     try {
       emitter.emit(
           ServiceMetricEvent.builder()
-              .setDimension("dataSource", dataSource)
-              .build("ingest/notices/queueSize", getNoticesQueueSize())
+                            .setDimension("dataSource", dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
+                            .build("ingest/notices/queueSize", 
getNoticesQueueSize())
       );
     }
     catch (Exception e) {
@@ -4207,12 +4221,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
 
         LagStats lagStats = computeLags(partitionLags);
+        Map<String, Object> metricTags = 
spec.getContextValue(DruidMetrics.TAGS);
         for (Map.Entry<PartitionIdType, Long> entry : 
partitionLags.entrySet()) {
           emitter.emit(
               ServiceMetricEvent.builder()
                                 .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                                 .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
                                 .setDimension(DruidMetrics.PARTITION, 
entry.getKey())
+                                .setDimensionIfNotNull(DruidMetrics.TAGS, 
metricTags)
                                 .build(
                                     
StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
                                     entry.getValue()
@@ -4223,18 +4239,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                               .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, 
metricTags)
                               .build(StringUtils.format("ingest/%s/lag%s", 
type, suffix), lagStats.getTotalLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                               .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, 
metricTags)
                               .build(StringUtils.format("ingest/%s/maxLag%s", 
type, suffix), lagStats.getMaxLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
                               .setDimension(DruidMetrics.STREAM, 
getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, 
metricTags)
                               .build(StringUtils.format("ingest/%s/avgLag%s", 
type, suffix), lagStats.getAvgLag())
         );
       };
@@ -4250,7 +4269,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
 
   /**
-   *  This method computes maxLag, totalLag and avgLag
+   * This method computes maxLag, totalLag and avgLag
+   *
    * @param partitionLags lags per partition
    */
   protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 0ac57f8df7..90b0b70563 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -132,6 +132,12 @@ public abstract class SeekableStreamSupervisorSpec 
implements SupervisorSpec
     return context;
   }
 
+  @Nullable
+  public <ContextValueType> ContextValueType getContextValue(String key)
+  {
+    return context == null ? null : (ContextValueType) context.get(key);
+  }
+
   public ServiceEmitter getEmitter()
   {
     return emitter;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
new file mode 100644
index 0000000000..b50e68d13c
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.realtime.FireDepartment;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TaskRealtimeMetricsMonitorTest
+{
+  private static final Map<String, String[]> DIMENSIONS = ImmutableMap.of(
+      "dim1",
+      new String[]{"v1", "v2"},
+      "dim2",
+      new String[]{"vv"}
+  );
+
+  private static final Map<String, Object> TAGS = ImmutableMap.of("author", 
"Author Name", "version", 10);
+
+  @Mock(answer = Answers.RETURNS_MOCKS)
+  private FireDepartment fireDepartment;
+  @Mock(answer = Answers.RETURNS_MOCKS)
+  private RowIngestionMeters rowIngestionMeters;
+  @Mock
+  private ServiceEmitter emitter;
+  private Map<String, ServiceMetricEvent> emittedEvents;
+  private TaskRealtimeMetricsMonitor target;
+
+  @Before
+  public void setUp()
+  {
+    emittedEvents = new HashMap<>();
+    
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Mockito
+        .doAnswer(invocation -> {
+          ServiceMetricEvent e = invocation.getArgument(0);
+          emittedEvents.put(e.getMetric(), e);
+          return null;
+        })
+        .when(emitter).emit(ArgumentMatchers.any(Event.class));
+    target = new TaskRealtimeMetricsMonitor(fireDepartment, 
rowIngestionMeters, DIMENSIONS, TAGS);
+  }
+
+  @Test
+  public void testdoMonitorShouldEmitUserProvidedTags()
+  {
+    target.doMonitor(emitter);
+    for (ServiceMetricEvent sme : emittedEvents.values()) {
+      Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS));
+    }
+  }
+
+  @Test
+  public void testdoMonitorWithoutTagsShouldNotEmitTags()
+  {
+    target = new TaskRealtimeMetricsMonitor(fireDepartment, 
rowIngestionMeters, DIMENSIONS, null);
+    for (ServiceMetricEvent sme : emittedEvents.values()) {
+      Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
new file mode 100644
index 0000000000..8543f893fd
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class IndexTaskUtilsTest
+{
+  private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", 
"v1", "k2", 20);
+  @Mock
+  private Task task;
+  @Mock
+  private AbstractTask abstractTask;
+  private ServiceMetricEvent.Builder metricBuilder;
+
+  @Before
+  public void setUp()
+  {
+    metricBuilder = ServiceMetricEvent.builder();
+    
Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+    
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+  }
+
+  @Test
+  public void testSetTaskDimensionsWithContextTagsShouldSetTags()
+  {
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    Assert.assertEquals(METRIC_TAGS, 
metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void 
testSetTaskDimensionsForAbstractTaskWithContextTagsShouldSetTags()
+  {
+    IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+    Assert.assertEquals(METRIC_TAGS, 
metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void testSetTaskDimensionsWithoutTagsShouldNotSetTags()
+  {
+    Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void testSetTaskDimensionsForAbstractTaskWithoutTagsShouldNotSetTags()
+  {
+    
Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
+    IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+    Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index b449867546..52e1600585 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -674,14 +674,15 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     EasyMock.replay(ingestionSchema);
 
     EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
-            
.andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
-                                                           "1",
-                                                           
"enableTaskAutoScaler",
-                                                           true,
-                                                           "taskCountMax",
-                                                           "4",
-                                                           "taskCountMin",
-                                                           "1"
+            .andReturn(mapper.convertValue(ImmutableMap.of(
+                "lagCollectionIntervalMillis",
+                "1",
+                "enableTaskAutoScaler",
+                true,
+                "taskCountMax",
+                "4",
+                "taskCountMin",
+                "1"
             ), AutoScalerConfig.class))
             .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
@@ -931,7 +932,8 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
         null,
         null,
         new IdleConfig(true, null)
-    ){
+    )
+    {
     };
 
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
@@ -981,6 +983,83 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
   }
 
+  @Test
+  public void testGetContextVauleWithNullContextShouldReturnNull()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertNull(spec.getContextValue("key"));
+  }
+
+  @Test
+  public void testGetContextVauleForNonExistentKeyShouldReturnNull()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertNull(spec.getContextValue("key_not_exists"));
+  }
+
+  @Test
+  public void testGetContextVauleForKeyShouldReturnValue()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new 
TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertEquals("value", spec.getContextValue("key"));
+  }
+
+  private void mockIngestionSchema()
+  {
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 61260221bb..d622984b0b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -68,6 +68,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -110,6 +111,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   private static final String SHARD_ID = "0";
   private static final StreamPartition<String> SHARD0_PARTITION = 
StreamPartition.of(STREAM, SHARD_ID);
   private static final String EXCEPTION_MSG = "I had an exception";
+  private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", 
"v1", "k2", 20);
 
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
@@ -151,6 +153,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
 
     EasyMock.expect(taskClientFactory.build(
         EasyMock.anyString(),
@@ -792,16 +795,22 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertEquals(6, events.size());
     Assert.assertEquals("ingest/test/lag", 
events.get(0).toMap().get("metric"));
     Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag", 
events.get(1).toMap().get("metric"));
     Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(1).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag", 
events.get(2).toMap().get("metric"));
     Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(2).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/lag/time", 
events.get(3).toMap().get("metric"));
     Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(3).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag/time", 
events.get(4).toMap().get("metric"));
     Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(4).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag/time", 
events.get(5).toMap().get("metric"));
     Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(5).toMap().get(DruidMetrics.TAGS));
     verifyAll();
   }
 
@@ -872,10 +881,13 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertEquals(3, events.size());
     Assert.assertEquals("ingest/test/lag/time", 
events.get(0).toMap().get("metric"));
     Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag/time", 
events.get(1).toMap().get("metric"));
     Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(1).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag/time", 
events.get(2).toMap().get("metric"));
     Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(2).toMap().get(DruidMetrics.TAGS));
     verifyAll();
   }
 
@@ -909,6 +921,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertEquals(1, events.size());
     Assert.assertEquals("ingest/notices/queueSize", 
events.get(0).toMap().get("metric"));
     Assert.assertEquals(0, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
     verifyAll();
   }
@@ -936,6 +949,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     events = filterMetrics(events, whitelist);
     Assert.assertEquals(1, events.size());
     Assert.assertEquals("ingest/notices/time", 
events.get(0).toMap().get("metric"));
+    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")), 
(long) events.get(0).toMap().get("value") > 0);
     Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
     Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
@@ -1063,6 +1077,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
     EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
 
     
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index b3858258e1..50482ce7f4 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -52,6 +52,8 @@ public class DruidMetrics
 
   public static final String PARTITION = "partition";
 
+  public static final String TAGS = "tags";
+
   public static int findNumComplexAggs(List<AggregatorFactory> aggs)
   {
     int retVal = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to