This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b065c60f496 KAFKA-19734 Adding application tag to clientstate metric 
(#20766)
b065c60f496 is described below

commit b065c60f49629aaccaab0b3bf38c415c35685ef3
Author: Genseric Ghiro <[email protected]>
AuthorDate: Mon Oct 27 15:58:47 2025 -0400

    KAFKA-19734 Adding application tag to clientstate metric (#20766)
    
    ## Summary
    
    As a follow-on to the improvements introduced in
    
    
[KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics)
    it would be useful to add the application-id as a tag to the
    `client-state` metric. This allows Kafka Streams developers and
    operators to connect metrics containing a `thread-id` (which embeds the
    `application-id`) across separate deployments of Kafka Streams
    instances, which are members of the same logical application.
    
    Reviewers: Bill Bejeck<[email protected]> Matthias
     Sax<[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   1 +
 .../internals/metrics/StreamsMetricsImpl.java      |   5 +
 ...KStreamSessionWindowAggregateProcessorTest.java |   2 +-
 .../processor/internals/ActiveTaskCreatorTest.java |   2 +-
 .../internals/DefaultStateUpdaterTest.java         |   2 +-
 .../internals/GlobalStreamThreadTest.java          |   6 +-
 .../processor/internals/MockStreamsMetrics.java    |   2 +-
 .../processor/internals/ProcessorNodeTest.java     |   6 +-
 .../processor/internals/RecordQueueTest.java       |   2 +-
 .../processor/internals/SourceNodeTest.java        |   2 +-
 .../processor/internals/StandbyTaskTest.java       |   2 +-
 .../processor/internals/StateDirectoryTest.java    |   2 +-
 .../processor/internals/StreamTaskTest.java        |   8 +-
 .../processor/internals/StreamThreadTest.java      |  43 ++++---
 .../internals/metrics/StreamsMetricsImplTest.java  | 129 ++++++++++++++-------
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |   8 +-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |   8 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   2 +-
 .../internals/GlobalStateStoreProviderTest.java    |   2 +-
 .../state/internals/KeyValueSegmentTest.java       |   2 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   2 +-
 .../state/internals/MeteredSessionStoreTest.java   |   2 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |   2 +-
 .../MeteredTimestampedWindowStoreTest.java         |   4 +-
 .../MeteredVersionedKeyValueStoreTest.java         |   2 +-
 .../state/internals/MeteredWindowStoreTest.java    |   2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |   6 +-
 .../RocksDBTimeOrderedKeyValueBufferTest.java      |   2 +-
 .../state/internals/TimestampedSegmentTest.java    |   2 +-
 .../metrics/RocksDBMetricsRecorderGaugesTest.java  |   6 +-
 .../metrics/RocksDBMetricsRecorderTest.java        |   2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  10 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   1 +
 .../streams/processor/MockProcessorContext.java    |   1 +
 .../processor/api/MockProcessorContext.java        |   1 +
 .../kafka/streams/MockProcessorContextTest.java    |   1 +
 36 files changed, 172 insertions(+), 110 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 69012f0c313..40b6fad5a3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -983,6 +983,7 @@ public class KafkaStreams implements AutoCloseable {
             metrics,
             clientId,
             processId.toString(),
+            applicationId,
             time
         );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index a0999a36c60..c56f079b064 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -90,6 +90,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private final Map<Sensor, Sensor> parentSensors;
     private final String clientId;
     private final String processId;
+    private final String applicationId;
 
     private final Version version;
     private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -118,6 +119,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
     public static final String CLIENT_ID_TAG = "client-id";
     public static final String PROCESS_ID_TAG = "process-id";
+    public static final String APPLICATION_ID_TAG = "application-id";
     public static final String THREAD_ID_TAG = "thread-id";
     public static final String TASK_ID_TAG = "task-id";
     public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@@ -168,11 +170,13 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
     public StreamsMetricsImpl(final Metrics metrics,
                               final String clientId,
                               final String processId,
+                              final String applicationId,
                               final Time time) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.metrics = metrics;
         this.clientId = clientId;
         this.processId = processId;
+        this.applicationId = applicationId;
         version = Version.LATEST;
         rocksDBMetricsRecordingTrigger = new 
RocksDBMetricsRecordingTrigger(time);
 
@@ -284,6 +288,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         final Map<String, String> tagMap = new LinkedHashMap<>();
         tagMap.put(CLIENT_ID_TAG, clientId);
         tagMap.put(PROCESS_ID_TAG, processId);
+        tagMap.put(APPLICATION_ID_TAG, applicationId);
         return tagMap;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index dd3960c17ec..c2aaf7f7373 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", "processId", time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
     private final String threadId = Thread.currentThread().getName();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 362c32592ca..2d8375939d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
     private ChangelogReader changeLogReader;
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new 
MockTime());
     private final Map<String, Object> properties = mkMap(
         mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
         mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index b6d41966257..cd10f2a7703 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
 
     // need an auto-tick timer to work for draining with timeout
     private final Time time = new MockTime(1L);
-    private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new 
Metrics(time), "", "", time);
+    private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new 
Metrics(time), "", "", "", time);
     private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
     private final TopologyMetadata topologyMetadata = 
unnamedTopology().build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index ff428828e05..6ec219a8cd9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -132,7 +132,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", time),
             time,
             "clientId",
             stateRestoreListener,
@@ -169,7 +169,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", time),
             time,
             "clientId",
             stateRestoreListener,
@@ -418,7 +418,7 @@ public class GlobalStreamThreadTest {
                 consumer,
                 new StateDirectory(config, time, true, false),
                 0,
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", time),
+                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", "applicationId", time),
                 time,
                 "clientId",
                 stateRestoreListener,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index a2e6820f901..54238d8bd6b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -23,6 +23,6 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 public class MockStreamsMetrics extends StreamsMetricsImpl {
 
     public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "test", "processId", new MockTime());
+        super(metrics, "test", "processId", "applicationId", new MockTime());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 6b7c06580c0..ccd47e60d9f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -279,7 +279,7 @@ public class ProcessorNodeTest {
     public void testMetricsWithBuiltInMetricsVersionLatest() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", "processId", 
"applicationId", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>(NAME, new NoOpProcessor(), 
Collections.emptySet());
@@ -363,7 +363,7 @@ public class ProcessorNodeTest {
     public void testTopologyLevelClassCastExceptionDirect() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", "processId", 
"applicationId", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("pname", new ClassCastProcessor(), 
Collections.emptySet());
@@ -441,7 +441,7 @@ public class ProcessorNodeTest {
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class, 
withSettings().strictness(Strictness.LENIENT));
 
         when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
-        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
+        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId", 
new MockTime()));
         when(internalProcessorContext.topic()).thenReturn(TOPIC);
         when(internalProcessorContext.partition()).thenReturn(PARTITION);
         when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 1dd19fb9cf7..9cd4e5fbc63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -72,7 +72,7 @@ public class RecordQueueTest {
 
     private final Metrics metrics = new Metrics();
     private final StreamsMetricsImpl streamsMetrics =
-        new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
+        new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId", 
new MockTime());
 
     final InternalMockProcessorContext<Integer, Integer> context = new 
InternalMockProcessorContext<>(
         StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index a509d14c974..d10b2800dc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -97,7 +97,7 @@ public class SourceNodeTest {
     public void shouldExposeProcessMetrics() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", "processId", 
"applicationId", new MockTime());
         final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final SourceNode<String, String> node =
             new SourceNode<>(context.currentNode().name(), new 
TheDeserializer(), new TheDeserializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6be27187f0f..df7f83d05fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, "processId", time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time);
 
     private File baseDir;
     private StreamsConfig config;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 5447b6f3976..a303e827a38 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -987,7 +987,7 @@ public class StateDirectoryTest {
         
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
         
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
 
-        directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new 
Metrics(), "test", "processId", time), new LogContext("test"));
+        directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new 
Metrics(), "test", "processId", "applicationId", time), new LogContext("test"));
 
         return store;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 385719530b9..71fd00c77ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2604,7 +2604,7 @@ public class StreamTaskTest {
                 streamsMetrics,
                 null
         );
-        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", "processId", time);
+        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", "processId", "applicationId", time);
 
         // The processor topology is missing the topics
         final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -3238,7 +3238,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", time),
             stateDirectory,
             cache,
             time,
@@ -3275,7 +3275,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", time),
             stateDirectory,
             cache,
             time,
@@ -3311,7 +3311,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", time),
             stateDirectory,
             cache,
             time,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 66b90ffcc03..99453a8d2a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -325,6 +325,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             time
         );
 
@@ -766,6 +767,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -831,6 +833,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -1196,7 +1199,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         stateDirectory = new StateDirectory(config, mockTime, true, false);
@@ -1473,7 +1476,7 @@ public class StreamThreadTest {
         }
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
 
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
         final StreamsConfig config = new StreamsConfig(props);
@@ -1945,6 +1948,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -2705,7 +2709,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2766,7 +2770,7 @@ public class StreamThreadTest {
         doThrow(new 
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2835,7 +2839,7 @@ public class StreamThreadTest {
         doNothing().when(taskManager).handleLostAll();
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2901,7 +2905,7 @@ public class StreamThreadTest {
         doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2964,7 +2968,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3199,7 +3203,7 @@ public class StreamThreadTest {
         final TaskManager taskManager = mock(TaskManager.class);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3258,7 +3262,7 @@ public class StreamThreadTest {
         
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
         final TaskManager taskManager = mock(TaskManager.class);
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3660,6 +3664,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -3720,6 +3725,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -3787,6 +3793,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             PROCESS_ID.toString(),
+            APPLICATION_ID,
             mockTime
         );
 
@@ -3882,7 +3889,7 @@ public class StreamThreadTest {
             null,
             mock(TaskManager.class),
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime),
             new TopologyMetadata(internalTopologyBuilder, config),
             PROCESS_ID,
             CLIENT_ID,
@@ -3942,7 +3949,7 @@ public class StreamThreadTest {
                 null,
                 mock(TaskManager.class),
                 null,
-                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
                 new TopologyMetadata(internalTopologyBuilder, config),
                 PROCESS_ID,
                 CLIENT_ID,
@@ -4011,7 +4018,7 @@ public class StreamThreadTest {
                 null,
                 mock(TaskManager.class),
                 null,
-                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
                 new TopologyMetadata(internalTopologyBuilder, config),
                 PROCESS_ID,
                 CLIENT_ID,
@@ -4071,7 +4078,7 @@ public class StreamThreadTest {
             null,
             mock(TaskManager.class),
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime),
             new TopologyMetadata(internalTopologyBuilder, config),
             PROCESS_ID,
             CLIENT_ID,
@@ -4131,7 +4138,7 @@ public class StreamThreadTest {
                 null,
                 mock(TaskManager.class),
                 null,
-                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
                 new TopologyMetadata(internalTopologyBuilder, config),
                 PROCESS_ID,
                 CLIENT_ID,
@@ -4200,7 +4207,7 @@ public class StreamThreadTest {
                 null,
                 mock(TaskManager.class),
                 null,
-                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
                 new TopologyMetadata(internalTopologyBuilder, config),
                 PROCESS_ID,
                 CLIENT_ID,
@@ -4296,7 +4303,7 @@ public class StreamThreadTest {
             "",
             taskManager,
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime),
             topologyMetadata,
             PROCESS_ID,
             "thread-id",
@@ -4348,7 +4355,7 @@ public class StreamThreadTest {
     private Collection<Task> createStandbyTask(final StreamsConfig config) {
         final LogContext logContext = new LogContext("test");
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
             new TopologyMetadata(internalTopologyBuilder, config),
             config,
@@ -4407,7 +4414,7 @@ public class StreamThreadTest {
                                            final StreamsConfig config,
                                            final TopologyMetadata 
topologyMetadata) {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
APPLICATION_ID, mockTime);
 
         return new StreamThread(
             mockTime,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bdb9d028c54..7070bb4df18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_ID_TAG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
@@ -95,6 +96,7 @@ public class StreamsMetricsImplTest {
     private static final String INTERNAL_PREFIX = "internal";
     private static final String CLIENT_ID = "test-client";
     private static final String PROCESS_ID = "test-process";
+    private static final String APPLICATION_ID = "test-app";
     private static final String THREAD_ID1 = "test-thread-1";
     private static final String TASK_ID1 = "test-task-1";
     private static final String TASK_ID2 = "test-task-2";
@@ -131,13 +133,14 @@ public class StreamsMetricsImplTest {
     private final String metricNamePrefix = "metric";
     private final String group = "group";
     private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
-    private final Map<String, String> clientLevelTags = 
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID));
+    private final Map<String, String> clientLevelTags = 
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID),
+        mkEntry(PROCESS_ID_TAG, PROCESS_ID), mkEntry(APPLICATION_ID_TAG, 
APPLICATION_ID));
     private final MetricName metricName1 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, 
clientLevelTags);
     private final MetricName metricName2 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, 
clientLevelTags);
     private final MockTime time = new MockTime(0);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, time);
 
     private static MetricConfig eqMetricConfig(final MetricConfig 
metricConfig) {
         final StringBuffer message = new StringBuffer();
@@ -251,7 +254,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -263,7 +267,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -275,7 +280,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -292,7 +298,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -309,7 +316,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -328,7 +336,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -347,7 +356,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final ArgumentCaptor<String> sensorKeys = 
setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -365,7 +375,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -381,7 +392,8 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, 
INFO_RECORDING_LEVEL);
@@ -393,7 +405,8 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -405,7 +418,8 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -417,7 +431,8 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() 
throws InterruptedException {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         final Thread otherThread =
@@ -432,7 +447,8 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -456,7 +472,8 @@ public class StreamsMetricsImplTest {
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
         when(metrics.addMetricIfAbsent(eq(metricName), 
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -489,7 +506,8 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -539,7 +557,8 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveStateStoreLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
         final MetricName metricName1 =
             new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricName metricName2 =
@@ -562,7 +581,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -580,7 +600,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -599,7 +620,8 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1,
@@ -618,7 +640,8 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1, TASK_ID1,
@@ -635,7 +658,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -647,7 +671,8 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -664,7 +689,8 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
             .thenReturn(metricName1);
         doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(immutableValue));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, 
DESCRIPTION1, recordingLevel, value);
     }
@@ -678,7 +704,8 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
             .thenReturn(metricName1);
         doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(valueProvider));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
 
         streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, 
recordingLevel, valueProvider);
     }
@@ -699,7 +726,8 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
         final ArgumentCaptor<String> sensorKeys = 
addSensorsOnAllLevels(metrics, streamsMetrics);
 
         
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +740,8 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveThreadLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
         addSensorsOnAllLevels(metrics, streamsMetrics);
         setupRemoveSensorsTest(metrics, THREAD_ID1);
 
@@ -721,7 +750,8 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void testNullMetrics() {
-        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", PROCESS_ID, time));
+        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", PROCESS_ID,
+            APPLICATION_ID, time));
     }
 
     @Test
@@ -754,7 +784,8 @@ public class StreamsMetricsImplTest {
     @Test
     public void testMultiLevelSensorRemoval() {
         final Metrics registry = new Metrics();
-        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+            time);
         for (final MetricName defaultMetric : registry.metrics().keySet()) {
             registry.removeMetric(defaultMetric);
         }
@@ -860,7 +891,8 @@ public class StreamsMetricsImplTest {
         final MockTime time = new MockTime(1);
         final MetricConfig config = new MetricConfig().timeWindow(1, 
TimeUnit.MILLISECONDS);
         final Metrics metrics = new Metrics(config, time);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", PROCESS_ID, APPLICATION_ID,
+            time);
 
         final String scope = "scope";
         final String entity = "entity";
@@ -894,7 +926,8 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddLatencyRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
         shouldAddCustomSensor(
             streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -909,7 +942,8 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+            time);
         shouldAddCustomSensor(
             streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -1035,9 +1069,10 @@ public class StreamsMetricsImplTest {
     public void shouldGetClientLevelTagMap() {
         final Map<String, String> tagMap = streamsMetrics.clientLevelTagMap();
 
-        assertThat(tagMap.size(), equalTo(2));
+        assertThat(tagMap.size(), equalTo(3));
         assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG), 
equalTo(CLIENT_ID));
         assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG), 
equalTo(PROCESS_ID));
+        assertThat(tagMap.get(StreamsMetricsImpl.APPLICATION_ID_TAG), 
equalTo(APPLICATION_ID));
     }
 
     @Test
@@ -1045,7 +1080,8 @@ public class StreamsMetricsImplTest {
         final String taskName = "test-task";
         final String storeType = "remote-window";
         final String storeName = "window-keeper";
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Map<String, String> tagMap = 
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
 
@@ -1060,7 +1096,8 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldGetCacheLevelTagMap() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
         final String taskName = "taskName";
         final String storeName = "storeName";
 
@@ -1077,7 +1114,8 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldGetThreadLevelTagMap() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+            time);
 
         final Map<String, String> tagMap = 
streamsMetrics.threadLevelTagMap(THREAD_ID1);
 
@@ -1210,7 +1248,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldReturnMetricsVersionCurrent() {
         assertThat(
-            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
time).version(),
+            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID, time).version(),
             equalTo(Version.LATEST)
         );
     }
@@ -1269,7 +1307,8 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelMutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
 
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
@@ -1293,7 +1332,8 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
 
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
@@ -1319,7 +1359,8 @@ public class StreamsMetricsImplTest {
     public void shouldCleanupThreadLevelMutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
             "test metric",
@@ -1341,7 +1382,8 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
 
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
@@ -1365,7 +1407,8 @@ public class StreamsMetricsImplTest {
     public void shouldCleanupThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
APPLICATION_ID,
+            time);
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
             "test metric",
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0f8ce890b19..902b362e153 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1350,7 +1350,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1386,7 +1386,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1425,7 +1425,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1466,7 +1466,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7a78716530a..77a5d9e8710 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -572,7 +572,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -612,7 +612,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -654,7 +654,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -698,7 +698,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 154517d3b94..e09a8ebe57b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
             streamsConfig,
             () -> collector,
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 4239e3e5000..573cc02a438 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -112,7 +112,7 @@ public class GlobalStateStoreProviderTest {
         when(mockContext.applicationId()).thenReturn("appId");
         when(mockContext.metrics())
             .thenReturn(
-                new StreamsMetricsImpl(new Metrics(), "threadName", 
"processId", new MockTime())
+                new StreamsMetricsImpl(new Metrics(), "threadName", 
"processId", "applicationId", new MockTime())
             );
         when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
         when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index e71704f32af..27aa52b48c6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -54,7 +54,7 @@ public class KeyValueSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", new MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 1ba655a75ce..0ecea105bad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -122,7 +122,7 @@ public class MeteredKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics()).thenReturn(
-            new StreamsMetricsImpl(metrics, "test", "processId", mockTime)
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", mockTime)
         );
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f8b08a532d1..4546e6b7c1f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -126,7 +126,7 @@ public class MeteredSessionStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-                .thenReturn(new StreamsMetricsImpl(metrics, "test", 
"processId", mockTime));
+                .thenReturn(new StreamsMetricsImpl(metrics, "test", 
"processId", "applicationId", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(innerStore.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 2e3c470387c..0b6556ac78b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -127,7 +127,7 @@ public class MeteredTimestampedKeyValueStoreTest {
         setUpWithoutContext();
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-            .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", 
mockTime));
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(inner.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index a3fe59c6e8b..a3ed1453454 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", new MockTime());
 
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
@@ -105,7 +105,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUpWithoutContextName() {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+                new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", new MockTime());
 
         context = new InternalMockProcessorContext<>(
                 TestUtils.tempDirectory(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 5c4509bc7a3..1602f1a115b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest {
     @BeforeEach
     public void setUp() {
         when(inner.name()).thenReturn(STORE_NAME);
-        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", "processId", mockTime));
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", "processId", "applicationId", mockTime));
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.taskId()).thenReturn(TASK_ID);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 2726ce26aa7..40e4e52eafa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredWindowStoreTest {
     @BeforeEach
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test", "processId", 
"applicationId", new MockTime());
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 70224c8013c..f56ac75a0c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -923,7 +923,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.DEBUG));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", "processId", 
"applicationId", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -956,7 +956,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", "processId", 
"applicationId", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -988,7 +988,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", "processId", 
"applicationId", time);
 
         final Properties props = StreamsTestUtils.getStreamsConfig();
         context = mock(InternalMockProcessorContext.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 69e7d31e31b..5f5495af425 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
     public void setUp() {
         final Metrics metrics = new Metrics();
         offset = 0;
-        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
"processId", new MockTime());
+        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
"processId", "applicationId", new MockTime());
         context = new 
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 633d14c1e63..0acd2a330da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -54,7 +54,7 @@ public class TimestampedSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", new MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 75c01cef3c9..62a37ff8d0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -202,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifySumOfProperties(final String propertyName) throws 
Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", new MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -219,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", new MockTime());
+                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", "applicationId", new MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -236,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithSingleCache(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
"applicationId", new MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index a0c068b59ee..b0ed10c45fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -194,7 +194,7 @@ public class RocksDBMetricsRecorderTest {
         assertThrows(
             IllegalStateException.class,
             () -> recorder.init(
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", "applicationId", new MockTime()),
                 TASK_ID1
             )
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed68c86c490..1361df9a09e 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -90,7 +90,7 @@ public class InternalMockProcessorContext<KOut, VOut>
         this(null,
             null,
             null,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null,
@@ -108,6 +108,7 @@ public class InternalMockProcessorContext<KOut, VOut>
                 new Metrics(),
                 "mock",
                 "processId",
+                "applicationId",
                 new MockTime()
             ),
             config,
@@ -141,6 +142,7 @@ public class InternalMockProcessorContext<KOut, VOut>
                 new Metrics(),
                 "mock",
                 "processId",
+                "applicationId",
                 new MockTime()
             ),
             config,
@@ -158,7 +160,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
             config,
             null,
             null,
@@ -178,7 +180,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             null,
             serdes.keySerde(),
             serdes.valueSerde(),
-            new StreamsMetricsImpl(metrics, "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(metrics, "mock", "processId", 
"applicationId", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             null,
@@ -195,7 +197,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", "processId", 
"applicationId", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 92976875b93..9b812b389fa 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -385,6 +385,7 @@ public class TopologyTestDriver implements Closeable {
                 metrics,
                 "test-client",
                 "processId",
+                "applicationId",
                 mockWallClockTime
         );
         TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), 
streamsMetrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 0ffea9b4916..08cf542305c 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -238,6 +238,7 @@ public class MockProcessorContext implements 
ProcessorContext, RecordCollector.S
                 new Metrics(metricConfig),
                 threadId,
                 "processId",
+                "applicationId",
                 Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 3ffa85a4503..25f728cd567 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -256,6 +256,7 @@ public class MockProcessorContext<KForward, VForward> 
implements ProcessorContex
             new Metrics(metricConfig),
             threadId,
             "processId",
+            "applicationId",
             Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ebb38dd773a..c760e5dd1d6 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -291,6 +291,7 @@ public class MockProcessorContextTest {
             new Metrics(new MetricConfig()),
             Thread.currentThread().getName(),
             "processId",
+            "applicationId",
             Time.SYSTEM
         ));
         when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 
1));

Reply via email to