This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 16476107363 KAFKA-19882: Removing process id from default client level 
tags (#20939)
16476107363 is described below

commit 164761073638963cb028f0ffa0d6256c9c3a816b
Author: Genseric Ghiro <[email protected]>
AuthorDate: Fri Nov 21 17:41:06 2025 -0500

    KAFKA-19882: Removing process id from default client level tags (#20939)
    
    ## Summary
    As a follow-up to #20906, also removing the process-id tag from 4.1.
    This is because when working on
    KIP-1091, we mistakenly applied the process-id tag to all client-level
    metrics, rather than just the client-state, thread-state, and
    recording-level metrics as specified in the KIP.
    
    ## Tests
    Unit tests in `ClientMetricsTest.java` and `StreamsMetricsImplTest.java`
    
    Reviewers: Matthias Sax <[email protected]>, Bill 
Bejeck<[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   5 +-
 .../streams/internals/metrics/ClientMetrics.java   |  10 +-
 .../internals/metrics/StreamsMetricsImpl.java      |  40 +++-
 .../internals/metrics/ClientMetricsTest.java       |  29 ++-
 ...KStreamSessionWindowAggregateProcessorTest.java |   2 +-
 .../processor/internals/ActiveTaskCreatorTest.java |   2 +-
 .../internals/DefaultStateUpdaterTest.java         |   2 +-
 .../internals/GlobalStreamThreadTest.java          |   6 +-
 .../processor/internals/MockStreamsMetrics.java    |   2 +-
 .../processor/internals/ProcessorNodeTest.java     |   6 +-
 .../processor/internals/RecordQueueTest.java       |   2 +-
 .../processor/internals/SourceNodeTest.java        |   2 +-
 .../processor/internals/StandbyTaskTest.java       |   2 +-
 .../processor/internals/StreamTaskTest.java        |   8 +-
 .../processor/internals/StreamThreadTest.java      |  35 ++--
 .../internals/metrics/StreamsMetricsImplTest.java  | 219 ++++++++++++++-------
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |   8 +-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |   8 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   2 +-
 .../internals/GlobalStateStoreProviderTest.java    |   2 +-
 .../state/internals/KeyValueSegmentTest.java       |   2 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   2 +-
 .../state/internals/MeteredSessionStoreTest.java   |   2 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |   2 +-
 .../MeteredTimestampedWindowStoreTest.java         |   4 +-
 .../MeteredVersionedKeyValueStoreTest.java         |   2 +-
 .../state/internals/MeteredWindowStoreTest.java    |   2 +-
 .../streams/state/internals/RocksDBStoreTest.java  |   6 +-
 .../RocksDBTimeOrderedKeyValueBufferTest.java      |   2 +-
 .../state/internals/TimestampedSegmentTest.java    |   2 +-
 .../metrics/RocksDBMetricsRecorderGaugesTest.java  |   6 +-
 .../metrics/RocksDBMetricsRecorderTest.java        |   2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  10 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   1 -
 .../streams/processor/MockProcessorContext.java    |   1 -
 .../processor/api/MockProcessorContext.java        |   1 -
 .../kafka/streams/MockProcessorContextTest.java    |   1 -
 37 files changed, 273 insertions(+), 167 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 656efc8bfe9..85356241af4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -980,7 +980,6 @@ public class KafkaStreams implements AutoCloseable {
         streamsMetrics = new StreamsMetricsImpl(
             metrics,
             clientId,
-            processId.toString(),
             time
         );
 
@@ -989,8 +988,8 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addApplicationIdMetric(streamsMetrics, 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, 
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> 
state.name());
-        ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, 
(metricsConfig, now) -> state.ordinal());
-        ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, 
calculateMetricsRecordingLevel());
+        ClientMetrics.addClientStateTelemetryMetric(processId.toString(), 
streamsMetrics, (metricsConfig, now) -> state.ordinal());
+        ClientMetrics.addClientRecordingLevelMetric(processId.toString(), 
streamsMetrics, calculateMetricsRecordingLevel());
         threads = Collections.synchronizedList(new LinkedList<>());
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) -> numLiveStreamThreads());
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 21bac269d5a..3b88e240172 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -25,9 +25,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.Properties;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
 
 public class ClientMetrics {
@@ -126,21 +128,25 @@ public class ClientMetrics {
         );
     }
 
-    public static void addClientStateTelemetryMetric(final StreamsMetricsImpl 
streamsMetrics,
+    public static void addClientStateTelemetryMetric(final String processId,
+                                                     final StreamsMetricsImpl 
streamsMetrics,
                                                      final Gauge<Integer> 
stateProvider) {
         streamsMetrics.addClientLevelMutableMetric(
             CLIENT_STATE,
             STATE_DESCRIPTION,
+            Collections.singletonMap(PROCESS_ID_TAG, processId),
             RecordingLevel.INFO,
             stateProvider
         );
     }
 
-    public static void addClientRecordingLevelMetric(final StreamsMetricsImpl 
streamsMetrics,
+    public static void addClientRecordingLevelMetric(final String processId,
+                                                     final StreamsMetricsImpl 
streamsMetrics,
                                                      final int recordingLevel) 
{
         streamsMetrics.addClientLevelImmutableMetric(
                 RECORDING_LEVEL,
                 RECORDING_LEVEL_DESCRIPTION,
+                Collections.singletonMap(PROCESS_ID_TAG, processId),
                 RecordingLevel.INFO,
                 recordingLevel
         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index a0999a36c60..f652059f8e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -89,7 +89,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private final Metrics metrics;
     private final Map<Sensor, Sensor> parentSensors;
     private final String clientId;
-    private final String processId;
 
     private final Version version;
     private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -167,12 +166,10 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
 
     public StreamsMetricsImpl(final Metrics metrics,
                               final String clientId,
-                              final String processId,
                               final Time time) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.metrics = metrics;
         this.clientId = clientId;
-        this.processId = processId;
         version = Version.LATEST;
         rocksDBMetricsRecordingTrigger = new 
RocksDBMetricsRecordingTrigger(time);
 
@@ -195,7 +192,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                                   final String description,
                                                   final RecordingLevel 
recordingLevel,
                                                   final T value) {
-        final MetricName metricName = metrics.metricName(name, 
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+        addClientLevelImmutableMetric(name, description, 
Collections.emptyMap(), recordingLevel, value);
+    }
+
+    public <T> void addClientLevelImmutableMetric(final String name,
+                                                  final String description,
+                                                  final Map<String, String> 
additionalTags,
+                                                  final RecordingLevel 
recordingLevel,
+                                                  final T value) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            CLIENT_LEVEL_GROUP,
+            description,
+            clientLevelTagMap(additionalTags)
+        );
         final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
         synchronized (clientLevelMetrics) {
             metrics.addMetric(metricName, metricConfig, new 
ImmutableMetricValue<>(value));
@@ -207,7 +217,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                                 final String description,
                                                 final RecordingLevel 
recordingLevel,
                                                 final Gauge<T> valueProvider) {
-        final MetricName metricName = metrics.metricName(name, 
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+        addClientLevelMutableMetric(name, description, Collections.emptyMap(), 
recordingLevel, valueProvider);
+    }
+
+    public <T> void addClientLevelMutableMetric(final String name,
+                                                final String description,
+                                                final Map<String, String> 
additionalTags,
+                                                final RecordingLevel 
recordingLevel,
+                                                final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            CLIENT_LEVEL_GROUP,
+            description,
+            clientLevelTagMap(additionalTags)
+        );
         final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
         synchronized (clientLevelMetrics) {
             metrics.addMetric(metricName, metricConfig, valueProvider);
@@ -281,9 +304,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public Map<String, String> clientLevelTagMap() {
-        final Map<String, String> tagMap = new LinkedHashMap<>();
+        return clientLevelTagMap(Collections.emptyMap());
+    }
+
+    public Map<String, String> clientLevelTagMap(final Map<String, String> 
additionalTags) {
+        final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
         tagMap.put(CLIENT_ID_TAG, clientId);
-        tagMap.put(PROCESS_ID_TAG, processId);
         return tagMap;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index 9142835b92c..2eee10f7854 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
 
 public class ClientMetricsTest {
     private static final String COMMIT_ID = "test-commit-ID";
+    private static final String PROCESS_ID = "test-process-id";
     private static final String VERSION = "test-version";
 
     private final StreamsMetricsImpl streamsMetrics = 
mock(StreamsMetricsImpl.class);
@@ -116,11 +117,15 @@ public class ClientMetricsTest {
         final String name = "client-state";
         final String description = "The state of the Kafka Streams client";
         final Gauge<Integer> stateProvider = (config, now) -> 
State.RUNNING.ordinal();
-        setUpAndVerifyMutableMetric(
-                name,
-                description,
-                stateProvider,
-                () -> 
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
+
+        ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID, 
streamsMetrics, stateProvider);
+
+        verify(streamsMetrics).addClientLevelMutableMetric(
+            eq(name),
+            eq(description),
+            eq(Collections.singletonMap("process-id", PROCESS_ID)),
+            eq(RecordingLevel.INFO),
+            eq(stateProvider)
         );
     }
 
@@ -129,11 +134,15 @@ public class ClientMetricsTest {
         final String name = "recording-level";
         final String description = "The metrics recording level of the Kafka 
Streams client";
         final int recordingLevel = 1;
-        setUpAndVerifyImmutableMetric(
-                name,
-                description,
-                recordingLevel,
-                () -> 
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
+
+        ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID, 
streamsMetrics, recordingLevel);
+
+        verify(streamsMetrics).addClientLevelImmutableMetric(
+            eq(name),
+            eq(description),
+            eq(Collections.singletonMap("process-id", PROCESS_ID)),
+            eq(RecordingLevel.INFO),
+            eq(recordingLevel)
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index dd3960c17ec..f6658997b7b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", "processId", time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", time);
     private final String threadId = Thread.currentThread().getName();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 17eb27bb3d8..dc2e8ffd513 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
     private ChangelogReader changeLogReader;
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
     private final Map<String, Object> properties = mkMap(
         mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
         mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index b6d41966257..8f767af93e0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
 
     // need an auto-tick timer to work for draining with timeout
     private final Time time = new MockTime(1L);
-    private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new 
Metrics(time), "", "", time);
+    private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new 
Metrics(time), "", time);
     private final StreamsConfig config = new 
StreamsConfig(configProps(COMMIT_INTERVAL));
     private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
     private final TopologyMetadata topologyMetadata = 
unnamedTopology().build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index d964b0c80b0..838bb7ab502 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -132,7 +132,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", time),
             time,
             "clientId",
             stateRestoreListener,
@@ -171,7 +171,7 @@ public class GlobalStreamThreadTest {
             mockConsumer,
             new StateDirectory(config, time, true, false),
             0,
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
time),
+            new StreamsMetricsImpl(new Metrics(), "test-client", time),
             time,
             "clientId",
             stateRestoreListener,
@@ -420,7 +420,7 @@ public class GlobalStreamThreadTest {
                 consumer,
                 new StateDirectory(config, time, true, false),
                 0,
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", time),
+                new StreamsMetricsImpl(new Metrics(), "test-client", time),
                 time,
                 "clientId",
                 stateRestoreListener,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index a2e6820f901..4ed68ef8150 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -23,6 +23,6 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 public class MockStreamsMetrics extends StreamsMetricsImpl {
 
     public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "test", "processId", new MockTime());
+        super(metrics, "test", new MockTime());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index ce5fddb870a..3a139b3233c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -219,7 +219,7 @@ public class ProcessorNodeTest {
     public void testMetricsWithBuiltInMetricsVersionLatest() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>(NAME, new NoOpProcessor(), 
Collections.emptySet());
@@ -303,7 +303,7 @@ public class ProcessorNodeTest {
     public void testTopologyLevelClassCastExceptionDirect() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final ProcessorNode<Object, Object, Object, Object> node =
             new ProcessorNode<>("pname", new ClassCastProcessor(), 
Collections.emptySet());
@@ -323,7 +323,7 @@ public class ProcessorNodeTest {
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mock(InternalProcessorContext.class, 
withSettings().strictness(Strictness.LENIENT));
 
         when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
-        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
+        when(internalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
         when(internalProcessorContext.topic()).thenReturn(TOPIC);
         when(internalProcessorContext.partition()).thenReturn(PARTITION);
         when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 1dd19fb9cf7..1becc3d240d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -72,7 +72,7 @@ public class RecordQueueTest {
 
     private final Metrics metrics = new Metrics();
     private final StreamsMetricsImpl streamsMetrics =
-        new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
+        new StreamsMetricsImpl(metrics, "mock", new MockTime());
 
     final InternalMockProcessorContext<Integer, Integer> context = new 
InternalMockProcessorContext<>(
         StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index a509d14c974..817ffe42f84 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -97,7 +97,7 @@ public class SourceNodeTest {
     public void shouldExposeProcessMetrics() {
         final Metrics metrics = new Metrics();
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-client", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test-client", new MockTime());
         final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(streamsMetrics);
         final SourceNode<String, String> node =
             new SourceNode<>(context.currentNode().name(), new 
TheDeserializer(), new TheDeserializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6be27187f0f..9680dd1af1d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
 
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, "processId", time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName, time);
 
     private File baseDir;
     private StreamsConfig config;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index fda9afa9a88..68ce72ff45c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2604,7 +2604,7 @@ public class StreamTaskTest {
                 streamsMetrics,
                 null
         );
-        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", "processId", time);
+        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", time);
 
         // The processor topology is missing the topics
         final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -3238,7 +3238,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
@@ -3275,7 +3275,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
@@ -3311,7 +3311,7 @@ public class StreamTaskTest {
             topology,
             consumer,
             new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            new StreamsMetricsImpl(metrics, "test", "processId", time),
+            new StreamsMetricsImpl(metrics, "test", time),
             stateDirectory,
             cache,
             time,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 740d7d2c8f9..968a6c1f34f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -320,7 +320,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             time
         );
 
@@ -728,7 +727,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -793,7 +791,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -1158,7 +1155,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(configProps(false, 
stateUpdaterEnabled, processingThreadsEnabled));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
 
@@ -1433,7 +1430,7 @@ public class StreamThreadTest {
         }
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
 
         final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
         final StreamsConfig config = new StreamsConfig(props);
@@ -1899,7 +1896,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -2655,7 +2651,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2716,7 +2712,7 @@ public class StreamThreadTest {
         doThrow(new 
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2786,7 +2782,7 @@ public class StreamThreadTest {
         doNothing().when(taskManager).handleLostAll();
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2852,7 +2848,7 @@ public class StreamThreadTest {
         doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -2915,7 +2911,7 @@ public class StreamThreadTest {
         when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3149,7 +3145,7 @@ public class StreamThreadTest {
         final TaskManager taskManager = mock(TaskManager.class);
 
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3208,7 +3204,7 @@ public class StreamThreadTest {
         
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
         final TaskManager taskManager = mock(TaskManager.class);
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
         thread = new StreamThread(
@@ -3598,7 +3594,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -3657,7 +3652,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -3724,7 +3718,6 @@ public class StreamThreadTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             APPLICATION_ID,
-            PROCESS_ID.toString(),
             mockTime
         );
 
@@ -3820,7 +3813,7 @@ public class StreamThreadTest {
             null,
             mock(TaskManager.class),
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
             new TopologyMetadata(internalTopologyBuilder, config),
             PROCESS_ID,
             CLIENT_ID,
@@ -3879,7 +3872,7 @@ public class StreamThreadTest {
             null,
             mock(TaskManager.class),
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
             new TopologyMetadata(internalTopologyBuilder, config),
             PROCESS_ID,
             CLIENT_ID,
@@ -3944,7 +3937,7 @@ public class StreamThreadTest {
             "",
             taskManager,
             null,
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime),
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
             topologyMetadata,
             PROCESS_ID,
             "thread-id",
@@ -3997,7 +3990,7 @@ public class StreamThreadTest {
         final LogContext logContext = new LogContext("test");
         final Logger log = logContext.logger(StreamThreadTest.class);
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
             new TopologyMetadata(internalTopologyBuilder, config),
             config,
@@ -4056,7 +4049,7 @@ public class StreamThreadTest {
                                            final StreamsConfig config,
                                            final TopologyMetadata 
topologyMetadata) {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
+            new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
 
         return new StreamThread(
             mockTime,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bdb9d028c54..63496800acb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
 import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
 import org.apache.kafka.test.StreamsTestUtils;
 
@@ -54,7 +53,6 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
-import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
@@ -131,13 +129,13 @@ public class StreamsMetricsImplTest {
     private final String metricNamePrefix = "metric";
     private final String group = "group";
     private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
-    private final Map<String, String> clientLevelTags = 
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID));
+    private final Map<String, String> clientLevelTags = 
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
     private final MetricName metricName1 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, 
clientLevelTags);
     private final MetricName metricName2 =
         new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2, 
clientLevelTags);
     private final MockTime time = new MockTime(0);
-    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
     private static MetricConfig eqMetricConfig(final MetricConfig 
metricConfig) {
         final StringBuffer message = new StringBuffer();
@@ -251,7 +249,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -263,7 +261,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
 
@@ -275,7 +273,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -292,7 +290,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.taskLevelSensor(
             THREAD_ID1,
@@ -309,7 +307,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -328,7 +326,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.topicLevelSensor(
             THREAD_ID1,
@@ -347,7 +345,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final ArgumentCaptor<String> sensorKeys = 
setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -365,7 +363,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.storeLevelSensor(
             TASK_ID1,
@@ -381,7 +379,7 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, 
INFO_RECORDING_LEVEL);
@@ -393,7 +391,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -405,7 +403,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -417,7 +415,7 @@ public class StreamsMetricsImplTest {
     public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() 
throws InterruptedException {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         final Thread otherThread =
@@ -432,7 +430,7 @@ public class StreamsMetricsImplTest {
     public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
         final Metrics metrics = mock(Metrics.class);
         final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
         streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, 
INFO_RECORDING_LEVEL);
@@ -456,7 +454,7 @@ public class StreamsMetricsImplTest {
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
         when(metrics.addMetricIfAbsent(eq(metricName), 
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -489,7 +487,7 @@ public class StreamsMetricsImplTest {
         when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
             .thenReturn(metricName);
         when(metrics.metric(metricName)).thenReturn(null);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         streamsMetrics.addStoreLevelMutableMetric(
             TASK_ID1,
@@ -539,7 +537,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveStateStoreLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         final MetricName metricName1 =
             new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, 
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
         final MetricName metricName2 =
@@ -562,7 +560,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -580,7 +578,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
             THREAD_ID1,
@@ -599,7 +597,7 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1,
@@ -618,7 +616,7 @@ public class StreamsMetricsImplTest {
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         final String processorCacheName = "processorNodeName";
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
             THREAD_ID1, TASK_ID1,
@@ -635,7 +633,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetNewSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -647,7 +645,7 @@ public class StreamsMetricsImplTest {
         final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
         setupGetExistingSensorTest(metrics);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
         final Sensor actualSensor = 
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
 
@@ -656,31 +654,110 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddClientLevelImmutableMetric() {
-        final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
         final String value = "immutable-value";
-        final ImmutableMetricValue immutableValue = new 
ImmutableMetricValue<>(value);
-        when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
-            .thenReturn(metricName1);
-        doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(immutableValue));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
 
-        streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, 
DESCRIPTION1, recordingLevel, value);
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+        streamsMetrics.addClientLevelImmutableMetric(
+            METRIC_NAME1,
+            DESCRIPTION1,
+            recordingLevel,
+            value
+        );
+
+        final MetricName name = metrics.metricName(
+            METRIC_NAME1,
+            CLIENT_LEVEL_GROUP,
+            mkMap(
+                mkEntry("client-id", CLIENT_ID)
+            )
+        );
+        assertThat(metrics.metric(name).metricName().name(), 
equalTo(METRIC_NAME1));
+        assertThat(metrics.metric(name).metricValue(), equalTo(value));
+    }
+
+    @Test
+    public void shouldAddClientLevelImmutableMetricWithAdditionalTags() {
+        final RecordingLevel recordingLevel = RecordingLevel.INFO;
+        final String value = "immutable-value";
+
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+        streamsMetrics.addClientLevelImmutableMetric(
+            METRIC_NAME1,
+            DESCRIPTION1,
+            Collections.singletonMap("additional-tag", "additional-value"),
+            recordingLevel,
+            value
+        );
+
+        final MetricName name = metrics.metricName(
+            METRIC_NAME1,
+            CLIENT_LEVEL_GROUP,
+            mkMap(
+                mkEntry("client-id", CLIENT_ID),
+                mkEntry("additional-tag", "additional-value")
+            )
+        );
+        assertThat(metrics.metric(name).metricName().name(), 
equalTo(METRIC_NAME1));
+        assertThat(metrics.metric(name).metricValue(), equalTo(value));
     }
 
     @Test
     public void shouldAddClientLevelMutableMetric() {
-        final Metrics metrics = mock(Metrics.class);
         final RecordingLevel recordingLevel = RecordingLevel.INFO;
-        final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
-        final Gauge<String> valueProvider = (config, now) -> "mutable-value";
-        when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, 
DESCRIPTION1, clientLevelTags))
-            .thenReturn(metricName1);
-        doNothing().when(metrics).addMetric(eq(metricName1), 
eqMetricConfig(metricConfig), eq(valueProvider));
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final String value = "mutable-value";
+
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
 
-        streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, 
recordingLevel, valueProvider);
+        streamsMetrics.addClientLevelMutableMetric(
+            METRIC_NAME1,
+            DESCRIPTION1,
+            recordingLevel,
+            (c, t) -> value
+        );
+
+        final MetricName name = metrics.metricName(
+            METRIC_NAME1,
+            CLIENT_LEVEL_GROUP,
+            mkMap(
+                mkEntry("client-id", CLIENT_ID)
+            )
+        );
+        assertThat(metrics.metric(name).metricName().name(), 
equalTo(METRIC_NAME1));
+        assertThat(metrics.metric(name).metricValue(), equalTo(value));
+    }
+
+    @Test
+    public void shouldAddClientLevelMutableMetricWithAdditionalTags() {
+        final RecordingLevel recordingLevel = RecordingLevel.INFO;
+        final String value = "mutable-value";
+
+        final StreamsMetricsImpl streamsMetrics
+            = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+        streamsMetrics.addClientLevelMutableMetric(
+            METRIC_NAME1,
+            DESCRIPTION1,
+            Collections.singletonMap("additional-tag", "additional-value"),
+            recordingLevel,
+            (c, t) -> value
+        );
+
+        final MetricName name = metrics.metricName(
+            METRIC_NAME1,
+            CLIENT_LEVEL_GROUP,
+            mkMap(
+                mkEntry("client-id", CLIENT_ID),
+                mkEntry("additional-tag", "additional-value")
+            )
+        );
+        assertThat(metrics.metric(name).metricName().name(), 
equalTo(METRIC_NAME1));
+        assertThat(metrics.metric(name).metricValue(), equalTo(value));
     }
 
     @Test
@@ -699,7 +776,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         final ArgumentCaptor<String> sensorKeys = 
addSensorsOnAllLevels(metrics, streamsMetrics);
 
         
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +789,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldRemoveThreadLevelSensors() {
         final Metrics metrics = mock(Metrics.class);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         addSensorsOnAllLevels(metrics, streamsMetrics);
         setupRemoveSensorsTest(metrics, THREAD_ID1);
 
@@ -721,7 +798,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void testNullMetrics() {
-        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", PROCESS_ID, time));
+        assertThrows(NullPointerException.class, () -> new 
StreamsMetricsImpl(null, "", time));
     }
 
     @Test
@@ -754,7 +831,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void testMultiLevelSensorRemoval() {
         final Metrics registry = new Metrics();
-        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
THREAD_ID1, time);
         for (final MetricName defaultMetric : registry.metrics().keySet()) {
             registry.removeMetric(defaultMetric);
         }
@@ -860,7 +937,7 @@ public class StreamsMetricsImplTest {
         final MockTime time = new MockTime(1);
         final MetricConfig config = new MetricConfig().timeWindow(1, 
TimeUnit.MILLISECONDS);
         final Metrics metrics = new Metrics(config, time);
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "", time);
 
         final String scope = "scope";
         final String entity = "entity";
@@ -894,7 +971,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddLatencyRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         shouldAddCustomSensor(
             streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -909,7 +986,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldAddRateTotalSensor() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, time);
         shouldAddCustomSensor(
             streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, 
OPERATION_NAME, RecordingLevel.DEBUG),
             streamsMetrics,
@@ -1035,9 +1112,8 @@ public class StreamsMetricsImplTest {
     public void shouldGetClientLevelTagMap() {
         final Map<String, String> tagMap = streamsMetrics.clientLevelTagMap();
 
-        assertThat(tagMap.size(), equalTo(2));
+        assertThat(tagMap.size(), equalTo(1));
         assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG), 
equalTo(CLIENT_ID));
-        assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG), 
equalTo(PROCESS_ID));
     }
 
     @Test
@@ -1045,7 +1121,7 @@ public class StreamsMetricsImplTest {
         final String taskName = "test-task";
         final String storeType = "remote-window";
         final String storeName = "window-keeper";
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         final Map<String, String> tagMap = 
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
 
@@ -1060,7 +1136,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldGetCacheLevelTagMap() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         final String taskName = "taskName";
         final String storeName = "storeName";
 
@@ -1077,7 +1153,7 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void shouldGetThreadLevelTagMap() {
-        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         final Map<String, String> tagMap = 
streamsMetrics.threadLevelTagMap(THREAD_ID1);
 
@@ -1210,7 +1286,7 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldReturnMetricsVersionCurrent() {
         assertThat(
-            new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, 
time).version(),
+            new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(),
             equalTo(Version.LATEST)
         );
     }
@@ -1268,58 +1344,61 @@ public class StreamsMetricsImplTest {
     @Test
     public void shouldAddThreadLevelMutableMetric() {
         final int measuredValue = 123;
+        final String name = "foobar";
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         streamsMetrics.addThreadLevelMutableMetric(
-            "foobar",
+            name,
             "test metric",
             "t1",
             (c, t) -> measuredValue
         );
 
-        final MetricName name = metrics.metricName(
-            "foobar",
+        final MetricName metricName = metrics.metricName(
+            name,
             THREAD_LEVEL_GROUP,
             mkMap(
                 mkEntry("thread-id", "t1")
             )
         );
-        assertThat(metrics.metric(name), notNullValue());
-        assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+
+        assertThat(metrics.metric(metricName).metricName().name(), 
equalTo(name));
+        assertThat(metrics.metric(metricName).metricValue(), 
equalTo(measuredValue));
     }
 
     @Test
     public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
         final int measuredValue = 123;
+        final String name = "foobar";
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         streamsMetrics.addThreadLevelMutableMetric(
-            "foobar",
+            name,
             "test metric",
             "t1",
             Collections.singletonMap("additional-tag", "additional-value"),
             (c, t) -> measuredValue
         );
 
-        final MetricName name = metrics.metricName(
-            "foobar",
+        final MetricName metricName = metrics.metricName(
+            name,
             THREAD_LEVEL_GROUP,
             mkMap(
                 mkEntry("thread-id", "t1"),
                 mkEntry("additional-tag", "additional-value")
             )
         );
-        assertThat(metrics.metric(name), notNullValue());
-        assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+        assertThat(metrics.metric(metricName).metricName().name(), 
equalTo(name));
+        assertThat(metrics.metric(metricName).metricValue(), 
equalTo(measuredValue));
     }
 
     @Test
     public void shouldCleanupThreadLevelMutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         streamsMetrics.addThreadLevelMutableMetric(
             "foobar",
             "test metric",
@@ -1341,7 +1420,7 @@ public class StreamsMetricsImplTest {
     public void shouldAddThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
 
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
@@ -1365,7 +1444,7 @@ public class StreamsMetricsImplTest {
     public void shouldCleanupThreadLevelImmutableMetric() {
         final int measuredValue = 123;
         final StreamsMetricsImpl streamsMetrics
-            = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+            = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
         streamsMetrics.addThreadLevelImmutableMetric(
             "foobar",
             "test metric",
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0f8ce890b19..fd5bbef428a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1350,7 +1350,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1386,7 +1386,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1425,7 +1425,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -1466,7 +1466,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7a78716530a..ede32caba90 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -572,7 +572,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -612,7 +612,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -654,7 +654,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
@@ -698,7 +698,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
                 dir,
                 Serdes.String(),
                 Serdes.String(),
-                new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
                 new StreamsConfig(props),
                 MockRecordCollector::new,
                 new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 154517d3b94..02de04a5247 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             streamsConfig,
             () -> collector,
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 4239e3e5000..0480fbc0721 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -112,7 +112,7 @@ public class GlobalStateStoreProviderTest {
         when(mockContext.applicationId()).thenReturn("appId");
         when(mockContext.metrics())
             .thenReturn(
-                new StreamsMetricsImpl(new Metrics(), "threadName", 
"processId", new MockTime())
+                new StreamsMetricsImpl(new Metrics(), "threadName", new 
MockTime())
             );
         when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
         when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index e71704f32af..fa0757ac62f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -54,7 +54,7 @@ public class KeyValueSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 294af3944f2..1a85901ccc4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -123,7 +123,7 @@ public class MeteredKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics()).thenReturn(
-            new StreamsMetricsImpl(metrics, "test", "processId", mockTime)
+            new StreamsMetricsImpl(metrics, "test", mockTime)
         );
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index ee1b686dade..e34c232075e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -126,7 +126,7 @@ public class MeteredSessionStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-                .thenReturn(new StreamsMetricsImpl(metrics, "test", 
"processId", mockTime));
+                .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(innerStore.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 2e3c470387c..9b5d33db966 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -127,7 +127,7 @@ public class MeteredTimestampedKeyValueStoreTest {
         setUpWithoutContext();
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.metrics())
-            .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId", 
mockTime));
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(inner.name()).thenReturn(STORE_NAME);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index a3fe59c6e8b..6a4871bd3a3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test", new MockTime());
 
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
@@ -105,7 +105,7 @@ public class MeteredTimestampedWindowStoreTest {
 
     public void setUpWithoutContextName() {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+                new StreamsMetricsImpl(metrics, "test", new MockTime());
 
         context = new InternalMockProcessorContext<>(
                 TestUtils.tempDirectory(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 8e8e02b2722..d40f6947480 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest {
     @BeforeEach
     public void setUp() {
         when(inner.name()).thenReturn(STORE_NAME);
-        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", "processId", mockTime));
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, 
"test", mockTime));
         when(context.applicationId()).thenReturn(APPLICATION_ID);
         when(context.taskId()).thenReturn(TASK_ID);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ba557104ebd..d0efdb8f4b7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredWindowStoreTest {
     @BeforeEach
     public void setUp() {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", "processId", new 
MockTime());
+            new StreamsMetricsImpl(metrics, "test", new MockTime());
         context = new InternalMockProcessorContext<>(
             TestUtils.tempDirectory(),
             Serdes.String(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 8a02289890e..7dd4caf24d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -923,7 +923,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.DEBUG));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -956,7 +956,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         context = mock(InternalMockProcessorContext.class);
         when(context.metrics()).thenReturn(streamsMetrics);
@@ -988,7 +988,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test-application", "processId", 
time);
+            new StreamsMetricsImpl(metrics, "test-application", time);
 
         final Properties props = StreamsTestUtils.getStreamsConfig();
         context = mock(InternalMockProcessorContext.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 69e7d31e31b..e288c04517e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
     public void setUp() {
         final Metrics metrics = new Metrics();
         offset = 0;
-        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", 
"processId", new MockTime());
+        streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new 
MockTime());
         context = new 
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new 
TaskId(0, 0), TestUtils.tempDirectory());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 633d14c1e63..11f7e6c13cb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -54,7 +54,7 @@ public class TimestampedSegmentTest {
     @BeforeEach
     public void setUp() {
         metricsRecorder.init(
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
             new TaskId(0, 0)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 75c01cef3c9..113de5959a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -202,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifySumOfProperties(final String propertyName) throws 
Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -219,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", new MockTime());
+                new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
@@ -236,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
 
     private void runAndVerifyBlockCacheMetricsWithSingleCache(final String 
propertyName) throws Exception {
         final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "test-client", "processId", 
new MockTime());
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime());
         final RocksDBMetricsRecorder recorder = new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
 
         recorder.init(streamsMetrics, TASK_ID);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index a0c068b59ee..0436811db79 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -194,7 +194,7 @@ public class RocksDBMetricsRecorderTest {
         assertThrows(
             IllegalStateException.class,
             () -> recorder.init(
-                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", new MockTime()),
+                new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
                 TASK_ID1
             )
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed68c86c490..219cc7a80fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -90,7 +90,7 @@ public class InternalMockProcessorContext<KOut, VOut>
         this(null,
             null,
             null,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             null,
             null,
@@ -107,7 +107,6 @@ public class InternalMockProcessorContext<KOut, VOut>
             new StreamsMetricsImpl(
                 new Metrics(),
                 "mock",
-                "processId",
                 new MockTime()
             ),
             config,
@@ -140,7 +139,6 @@ public class InternalMockProcessorContext<KOut, VOut>
             new StreamsMetricsImpl(
                 new Metrics(),
                 "mock",
-                "processId",
                 new MockTime()
             ),
             config,
@@ -158,7 +156,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             config,
             null,
             null,
@@ -178,7 +176,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             null,
             serdes.keySerde(),
             serdes.valueSerde(),
-            new StreamsMetricsImpl(metrics, "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(metrics, "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             null,
@@ -195,7 +193,7 @@ public class InternalMockProcessorContext<KOut, VOut>
             stateDir,
             keySerde,
             valueSerde,
-            new StreamsMetricsImpl(new Metrics(), "mock", "processId", new 
MockTime()),
+            new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
             new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
             () -> collector,
             cache,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 81c90d043ce..a1cd78d6fc0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -384,7 +384,6 @@ public class TopologyTestDriver implements Closeable {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
                 metrics,
                 "test-client",
-                "processId",
                 mockWallClockTime
         );
         TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(), 
streamsMetrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index fc7d27a3bb7..a9d4e30bcfc 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -237,7 +237,6 @@ public class MockProcessorContext implements 
ProcessorContext, RecordCollector.S
         this.metrics = new StreamsMetricsImpl(
                 new Metrics(metricConfig),
                 threadId,
-                "processId",
                 Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 52a2308dafe..7d366cddfb8 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward> 
implements ProcessorContex
         metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            "processId",
             Time.SYSTEM
         );
         TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ebb38dd773a..72d125304f1 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -290,7 +290,6 @@ public class MockProcessorContextTest {
         when(mockInternalProcessorContext.metrics()).thenReturn(new 
StreamsMetricsImpl(
             new Metrics(new MetricConfig()),
             Thread.currentThread().getName(),
-            "processId",
             Time.SYSTEM
         ));
         when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1, 
1));


Reply via email to