This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 65a1f267c57 KAFKA-19882: Removing process id from default client level
tags (#20939)
65a1f267c57 is described below
commit 65a1f267c57d2f8d5cf6ad9f72993a4798a407ee
Author: Genseric Ghiro <[email protected]>
AuthorDate: Fri Nov 21 17:41:06 2025 -0500
KAFKA-19882: Removing process id from default client level tags (#20939)
As a follow-up to #20906, also removing the process-id tag from 4.1.
This is because when working on
KIP-1091, we mistakenly applied the process-id tag to all client-level
metrics, rather than just the client-state, thread-state, and
recording-level metrics as specified in the KIP.
Unit tests in `ClientMetricsTest.java` and `StreamsMetricsImplTest.java`
Reviewers: Matthias Sax <[email protected]>, Bill
Bejeck<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 5 +-
.../streams/internals/metrics/ClientMetrics.java | 10 +-
.../internals/metrics/StreamsMetricsImpl.java | 40 +++-
.../internals/metrics/ClientMetricsTest.java | 29 ++-
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../processor/internals/ActiveTaskCreatorTest.java | 2 +-
.../internals/DefaultStateUpdaterTest.java | 2 +-
.../internals/GlobalStreamThreadTest.java | 6 +-
.../processor/internals/MockStreamsMetrics.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 6 +-
.../processor/internals/RecordQueueTest.java | 2 +-
.../processor/internals/SourceNodeTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 8 +-
.../processor/internals/StreamThreadTest.java | 40 ++--
.../internals/metrics/StreamsMetricsImplTest.java | 219 ++++++++++++++-------
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 8 +-
.../AbstractRocksDBSegmentedBytesStoreTest.java | 8 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 2 +-
.../internals/GlobalStateStoreProviderTest.java | 2 +-
.../state/internals/KeyValueSegmentTest.java | 2 +-
.../state/internals/MeteredKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredSessionStoreTest.java | 2 +-
.../MeteredTimestampedKeyValueStoreTest.java | 2 +-
.../MeteredTimestampedWindowStoreTest.java | 4 +-
.../MeteredVersionedKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredWindowStoreTest.java | 2 +-
.../streams/state/internals/RocksDBStoreTest.java | 6 +-
.../RocksDBTimeOrderedKeyValueBufferTest.java | 2 +-
.../state/internals/TimestampedSegmentTest.java | 2 +-
.../metrics/RocksDBMetricsRecorderGaugesTest.java | 6 +-
.../metrics/RocksDBMetricsRecorderTest.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 10 +-
.../apache/kafka/streams/TopologyTestDriver.java | 1 -
.../streams/processor/MockProcessorContext.java | 1 -
.../processor/api/MockProcessorContext.java | 1 -
.../kafka/streams/MockProcessorContextTest.java | 1 -
37 files changed, 277 insertions(+), 168 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 41981e16f60..73e3468d0f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -970,7 +970,6 @@ public class KafkaStreams implements AutoCloseable {
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
- processId.toString(),
time
);
@@ -979,8 +978,8 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addApplicationIdMetric(streamsMetrics,
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state.name());
- ClientMetrics.addClientStateTelemetryMetric(streamsMetrics,
(metricsConfig, now) -> state.ordinal());
- ClientMetrics.addClientRecordingLevelMetric(streamsMetrics,
calculateMetricsRecordingLevel());
+ ClientMetrics.addClientStateTelemetryMetric(processId.toString(),
streamsMetrics, (metricsConfig, now) -> state.ordinal());
+ ClientMetrics.addClientRecordingLevelMetric(processId.toString(),
streamsMetrics, calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) -> numLiveStreamThreads());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 21bac269d5a..3b88e240172 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -25,9 +25,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
+import java.util.Collections;
import java.util.Properties;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
public class ClientMetrics {
@@ -126,21 +128,25 @@ public class ClientMetrics {
);
}
- public static void addClientStateTelemetryMetric(final StreamsMetricsImpl
streamsMetrics,
+ public static void addClientStateTelemetryMetric(final String processId,
+ final StreamsMetricsImpl
streamsMetrics,
final Gauge<Integer>
stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
CLIENT_STATE,
STATE_DESCRIPTION,
+ Collections.singletonMap(PROCESS_ID_TAG, processId),
RecordingLevel.INFO,
stateProvider
);
}
- public static void addClientRecordingLevelMetric(final StreamsMetricsImpl
streamsMetrics,
+ public static void addClientRecordingLevelMetric(final String processId,
+ final StreamsMetricsImpl
streamsMetrics,
final int recordingLevel)
{
streamsMetrics.addClientLevelImmutableMetric(
RECORDING_LEVEL,
RECORDING_LEVEL_DESCRIPTION,
+ Collections.singletonMap(PROCESS_ID_TAG, processId),
RecordingLevel.INFO,
recordingLevel
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 0dd3f77f199..ae4979a6428 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -87,7 +87,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Metrics metrics;
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
- private final String processId;
private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -165,12 +164,10 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
- final String processId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
- this.processId = processId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new
RocksDBMetricsRecordingTrigger(time);
@@ -193,7 +190,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String description,
final RecordingLevel
recordingLevel,
final T value) {
- final MetricName metricName = metrics.metricName(name,
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+ addClientLevelImmutableMetric(name, description,
Collections.emptyMap(), recordingLevel, value);
+ }
+
+ public <T> void addClientLevelImmutableMetric(final String name,
+ final String description,
+ final Map<String, String>
additionalTags,
+ final RecordingLevel
recordingLevel,
+ final T value) {
+ final MetricName metricName = metrics.metricName(
+ name,
+ CLIENT_LEVEL_GROUP,
+ description,
+ clientLevelTagMap(additionalTags)
+ );
final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, new
ImmutableMetricValue<>(value));
@@ -205,7 +215,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String description,
final RecordingLevel
recordingLevel,
final Gauge<T> valueProvider) {
- final MetricName metricName = metrics.metricName(name,
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+ addClientLevelMutableMetric(name, description, Collections.emptyMap(),
recordingLevel, valueProvider);
+ }
+
+ public <T> void addClientLevelMutableMetric(final String name,
+ final String description,
+ final Map<String, String>
additionalTags,
+ final RecordingLevel
recordingLevel,
+ final Gauge<T> valueProvider) {
+ final MetricName metricName = metrics.metricName(
+ name,
+ CLIENT_LEVEL_GROUP,
+ description,
+ clientLevelTagMap(additionalTags)
+ );
final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, valueProvider);
@@ -279,9 +302,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public Map<String, String> clientLevelTagMap() {
- final Map<String, String> tagMap = new LinkedHashMap<>();
+ return clientLevelTagMap(Collections.emptyMap());
+ }
+
+ public Map<String, String> clientLevelTagMap(final Map<String, String>
additionalTags) {
+ final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
tagMap.put(CLIENT_ID_TAG, clientId);
- tagMap.put(PROCESS_ID_TAG, processId);
return tagMap;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index 9142835b92c..2eee10f7854 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
public class ClientMetricsTest {
private static final String COMMIT_ID = "test-commit-ID";
+ private static final String PROCESS_ID = "test-process-id";
private static final String VERSION = "test-version";
private final StreamsMetricsImpl streamsMetrics =
mock(StreamsMetricsImpl.class);
@@ -116,11 +117,15 @@ public class ClientMetricsTest {
final String name = "client-state";
final String description = "The state of the Kafka Streams client";
final Gauge<Integer> stateProvider = (config, now) ->
State.RUNNING.ordinal();
- setUpAndVerifyMutableMetric(
- name,
- description,
- stateProvider,
- () ->
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
+
+ ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID,
streamsMetrics, stateProvider);
+
+ verify(streamsMetrics).addClientLevelMutableMetric(
+ eq(name),
+ eq(description),
+ eq(Collections.singletonMap("process-id", PROCESS_ID)),
+ eq(RecordingLevel.INFO),
+ eq(stateProvider)
);
}
@@ -129,11 +134,15 @@ public class ClientMetricsTest {
final String name = "recording-level";
final String description = "The metrics recording level of the Kafka
Streams client";
final int recordingLevel = 1;
- setUpAndVerifyImmutableMetric(
- name,
- description,
- recordingLevel,
- () ->
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
+
+ ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID,
streamsMetrics, recordingLevel);
+
+ verify(streamsMetrics).addClientLevelImmutableMetric(
+ eq(name),
+ eq(description),
+ eq(Collections.singletonMap("process-id", PROCESS_ID)),
+ eq(RecordingLevel.INFO),
+ eq(recordingLevel)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index dd3960c17ec..f6658997b7b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey,
value, aggregate) -> aggregate + 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 17eb27bb3d8..dc2e8ffd513 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 9f2a4c9356d..4f540fcba64 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
- private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", time);
+ private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", time);
private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata =
unnamedTopology().build();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index cfc1486b4be..4c397c2a322 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -132,7 +132,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
@@ -171,7 +171,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
@@ -420,7 +420,7 @@ public class GlobalStreamThreadTest {
consumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index a2e6820f901..4ed68ef8150 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -23,6 +23,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
public class MockStreamsMetrics extends StreamsMetricsImpl {
public MockStreamsMetrics(final Metrics metrics) {
- super(metrics, "test", "processId", new MockTime());
+ super(metrics, "test", new MockTime());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index a16315d363b..5a786b7174a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -217,7 +217,7 @@ public class ProcessorNodeTest {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(),
Collections.emptySet());
@@ -301,7 +301,7 @@ public class ProcessorNodeTest {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(),
Collections.emptySet());
@@ -321,7 +321,7 @@ public class ProcessorNodeTest {
final InternalProcessorContext<Object, Object>
internalProcessorContext = mock(InternalProcessorContext.class,
withSettings().strictness(Strictness.LENIENT));
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
- when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
+ when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 1dd19fb9cf7..1becc3d240d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -72,7 +72,7 @@ public class RecordQueueTest {
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
+ new StreamsMetricsImpl(metrics, "mock", new MockTime());
final InternalMockProcessorContext<Integer, Integer> context = new
InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index e0fa79fd450..90d88359042 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -97,7 +97,7 @@ public class SourceNodeTest {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new
TheDeserializer(), new TheDeserializer());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e953a61fc1f..320d6c44510 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, time);
private File baseDir;
private StreamsConfig config;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5dab5329026..e441bd71ef8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2588,7 +2588,7 @@ public class StreamTaskTest {
streamsMetrics,
null
);
- final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", "processId", time);
+ final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", time);
// The processor topology is missing the topics
final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -3222,7 +3222,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
@@ -3259,7 +3259,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
@@ -3295,7 +3295,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e5b3c0b1194..3f90c642b17 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -309,7 +309,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
time
);
@@ -717,7 +716,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -782,7 +780,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -1147,7 +1144,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
@@ -1370,7 +1367,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
@@ -1424,7 +1421,7 @@ public class StreamThreadTest {
}
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final Properties props = configProps(false, stateUpdaterEnabled,
processingThreadsEnabled);
final StreamsConfig config = new StreamsConfig(props);
@@ -1471,7 +1468,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
@@ -1491,7 +1488,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
@@ -1892,7 +1889,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -2590,7 +2586,7 @@ public class StreamThreadTest {
doThrow(new TaskMigratedException("Task lost exception", new
RuntimeException())).when(taskManager).handleLostAll();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
@@ -2620,7 +2616,7 @@ public class StreamThreadTest {
doThrow(new TaskMigratedException("Revocation non fatal exception",
new RuntimeException())).when(taskManager).handleRevocation(assignedPartitions);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata)
@@ -2652,7 +2648,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2711,7 +2707,7 @@ public class StreamThreadTest {
doThrow(new
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2779,7 +2775,7 @@ public class StreamThreadTest {
doNothing().when(taskManager).handleLostAll();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2843,7 +2839,7 @@ public class StreamThreadTest {
doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2904,7 +2900,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3113,7 +3109,7 @@ public class StreamThreadTest {
when(taskManager.producerMetrics()).thenReturn(dummyProducerMetrics);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata);
@@ -3138,7 +3134,7 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3195,7 +3191,7 @@ public class StreamThreadTest {
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3588,7 +3584,7 @@ public class StreamThreadTest {
"",
taskManager,
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
topologyMetadata,
PROCESS_ID,
"thread-id",
@@ -3639,7 +3635,7 @@ public class StreamThreadTest {
final LogContext logContext = new LogContext("test");
final Logger log = logContext.logger(StreamThreadTest.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
new TopologyMetadata(internalTopologyBuilder, config),
config,
@@ -3698,7 +3694,7 @@ public class StreamThreadTest {
final StreamsConfig config,
final TopologyMetadata
topologyMetadata) {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
return new StreamThread(
mockTime,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index c6bf68e4df5..907c6ec78a6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.test.StreamsTestUtils;
@@ -54,7 +53,6 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
-import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
@@ -133,13 +131,13 @@ public class StreamsMetricsImplTest {
private final String metricNamePrefix = "metric";
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
- private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID));
+ private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1,
clientLevelTags);
private final MetricName metricName2 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2,
clientLevelTags);
private final MockTime time = new MockTime(0);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
private static MetricConfig eqMetricConfig(final MetricConfig
metricConfig) {
final StringBuffer message = new StringBuffer();
@@ -254,7 +252,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -266,7 +264,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -278,7 +276,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -295,7 +293,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -312,7 +310,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -331,7 +329,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -350,7 +348,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final ArgumentCaptor<String> sensorKeys =
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -368,7 +366,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -384,7 +382,7 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2,
INFO_RECORDING_LEVEL);
@@ -396,7 +394,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -408,7 +406,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -420,7 +418,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds()
throws InterruptedException {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
final Thread otherThread =
@@ -435,7 +433,7 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -459,7 +457,7 @@ public class StreamsMetricsImplTest {
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
when(metrics.addMetricIfAbsent(eq(metricName),
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -491,7 +489,7 @@ public class StreamsMetricsImplTest {
when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -539,7 +537,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final MetricName metricName1 =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricName metricName2 =
@@ -562,7 +560,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -580,7 +578,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -599,7 +597,7 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1,
@@ -618,7 +616,7 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1, TASK_ID1,
@@ -635,7 +633,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -647,7 +645,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -656,31 +654,110 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddClientLevelImmutableMetric() {
- final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
- final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
final String value = "immutable-value";
- final ImmutableMetricValue immutableValue = new
ImmutableMetricValue<>(value);
- when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
- .thenReturn(metricName1);
- doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(immutableValue));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
- streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1,
DESCRIPTION1, recordingLevel, value);
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelImmutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ recordingLevel,
+ value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID)
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
+ }
+
+ @Test
+ public void shouldAddClientLevelImmutableMetricWithAdditionalTags() {
+ final RecordingLevel recordingLevel = RecordingLevel.INFO;
+ final String value = "immutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelImmutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ Collections.singletonMap("additional-tag", "additional-value"),
+ recordingLevel,
+ value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID),
+ mkEntry("additional-tag", "additional-value")
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
}
@Test
public void shouldAddClientLevelMutableMetric() {
- final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
- final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
- final Gauge<String> valueProvider = (config, now) -> "mutable-value";
- when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
- .thenReturn(metricName1);
- doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(valueProvider));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final String value = "mutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
- streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1,
recordingLevel, valueProvider);
+ streamsMetrics.addClientLevelMutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ recordingLevel,
+ (c, t) -> value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID)
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
+ }
+
+ @Test
+ public void shouldAddClientLevelMutableMetricWithAdditionalTags() {
+ final RecordingLevel recordingLevel = RecordingLevel.INFO;
+ final String value = "mutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelMutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ Collections.singletonMap("additional-tag", "additional-value"),
+ recordingLevel,
+ (c, t) -> value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID),
+ mkEntry("additional-tag", "additional-value")
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
}
@Test
@@ -699,7 +776,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveClientLevelMetricsAndSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final ArgumentCaptor<String> sensorKeys =
addSensorsOnAllLevels(metrics, streamsMetrics);
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +789,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveThreadLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
addSensorsOnAllLevels(metrics, streamsMetrics);
setupRemoveSensorsTest(metrics, THREAD_ID1);
@@ -721,7 +798,7 @@ public class StreamsMetricsImplTest {
@Test
public void testNullMetrics() {
- assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", PROCESS_ID, time));
+ assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", time));
}
@Test
@@ -754,7 +831,7 @@ public class StreamsMetricsImplTest {
@Test
public void testMultiLevelSensorRemoval() {
final Metrics registry = new Metrics();
- final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, time);
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}
@@ -860,7 +937,7 @@ public class StreamsMetricsImplTest {
final MockTime time = new MockTime(1);
final MetricConfig config = new MetricConfig().timeWindow(1,
TimeUnit.MILLISECONDS);
final Metrics metrics = new Metrics(config, time);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", time);
final String scope = "scope";
final String entity = "entity";
@@ -894,7 +971,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddLatencyRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
shouldAddCustomSensor(
streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -909,7 +986,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
shouldAddCustomSensor(
streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -1035,9 +1112,8 @@ public class StreamsMetricsImplTest {
public void shouldGetClientLevelTagMap() {
final Map<String, String> tagMap = streamsMetrics.clientLevelTagMap();
- assertThat(tagMap.size(), equalTo(2));
+ assertThat(tagMap.size(), equalTo(1));
assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG),
equalTo(CLIENT_ID));
- assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG),
equalTo(PROCESS_ID));
}
@Test
@@ -1045,7 +1121,7 @@ public class StreamsMetricsImplTest {
final String taskName = "test-task";
final String storeType = "remote-window";
final String storeName = "window-keeper";
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, time);
final Map<String, String> tagMap =
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
@@ -1060,7 +1136,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetCacheLevelTagMap() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ new StreamsMetricsImpl(metrics, THREAD_ID1, time);
final String taskName = "taskName";
final String storeName = "storeName";
@@ -1077,7 +1153,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetThreadLevelTagMap() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, time);
final Map<String, String> tagMap =
streamsMetrics.threadLevelTagMap(THREAD_ID1);
@@ -1210,7 +1286,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
time).version(),
+ new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(),
equalTo(Version.LATEST)
);
}
@@ -1268,58 +1344,61 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddThreadLevelMutableMetric() {
final int measuredValue = 123;
+ final String name = "foobar";
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
- "foobar",
+ name,
"test metric",
"t1",
(c, t) -> measuredValue
);
- final MetricName name = metrics.metricName(
- "foobar",
+ final MetricName metricName = metrics.metricName(
+ name,
THREAD_LEVEL_GROUP,
mkMap(
mkEntry("thread-id", "t1")
)
);
- assertThat(metrics.metric(name), notNullValue());
- assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+
+ assertThat(metrics.metric(metricName).metricName().name(),
equalTo(name));
+ assertThat(metrics.metric(metricName).metricValue(),
equalTo(measuredValue));
}
@Test
public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
final int measuredValue = 123;
+ final String name = "foobar";
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
- "foobar",
+ name,
"test metric",
"t1",
Collections.singletonMap("additional-tag", "additional-value"),
(c, t) -> measuredValue
);
- final MetricName name = metrics.metricName(
- "foobar",
+ final MetricName metricName = metrics.metricName(
+ name,
THREAD_LEVEL_GROUP,
mkMap(
mkEntry("thread-id", "t1"),
mkEntry("additional-tag", "additional-value")
)
);
- assertThat(metrics.metric(name), notNullValue());
- assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ assertThat(metrics.metric(metricName).metricName().name(),
equalTo(name));
+ assertThat(metrics.metric(metricName).metricValue(),
equalTo(measuredValue));
}
@Test
public void shouldCleanupThreadLevelMutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
"test metric",
@@ -1341,7 +1420,7 @@ public class StreamsMetricsImplTest {
public void shouldAddThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
@@ -1365,7 +1444,7 @@ public class StreamsMetricsImplTest {
public void shouldCleanupThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
"test metric",
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0f8ce890b19..fd5bbef428a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1350,7 +1350,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1386,7 +1386,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1425,7 +1425,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1466,7 +1466,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7a78716530a..ede32caba90 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -572,7 +572,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -612,7 +612,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -654,7 +654,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -698,7 +698,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 154517d3b94..02de04a5247 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
streamsConfig,
() -> collector,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 8c28a9eabec..e7f8c10e9bc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -112,7 +112,7 @@ public class GlobalStateStoreProviderTest {
when(mockContext.applicationId()).thenReturn("appId");
when(mockContext.metrics())
.thenReturn(
- new StreamsMetricsImpl(new Metrics(), "threadName",
"processId", new MockTime())
+ new StreamsMetricsImpl(new Metrics(), "threadName", new
MockTime())
);
when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index e71704f32af..fa0757ac62f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -54,7 +54,7 @@ public class KeyValueSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 4a8c891355d..298944086a1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -123,7 +123,7 @@ public class MeteredKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics()).thenReturn(
- new StreamsMetricsImpl(metrics, "test", "processId", mockTime)
+ new StreamsMetricsImpl(metrics, "test", mockTime)
);
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index ee1b686dade..e34c232075e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -126,7 +126,7 @@ public class MeteredSessionStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test",
"processId", mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(innerStore.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index fa42cb07283..d3e2c265c04 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -127,7 +127,7 @@ public class MeteredTimestampedKeyValueStoreTest {
setUpWithoutContext();
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId",
mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(inner.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index a3fe59c6e8b..6a4871bd3a3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
@@ -105,7 +105,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUpWithoutContextName() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index f0a7f23c09c..813b860869b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest {
@BeforeEach
public void setUp() {
when(inner.name()).thenReturn(STORE_NAME);
- when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", "processId", mockTime));
+ when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", mockTime));
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.taskId()).thenReturn(TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ba557104ebd..d0efdb8f4b7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredWindowStoreTest {
@BeforeEach
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 8a02289890e..7dd4caf24d2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -923,7 +923,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -956,7 +956,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -988,7 +988,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
final Properties props = StreamsTestUtils.getStreamsConfig();
context = mock(InternalMockProcessorContext.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 9a92df55336..9ad53aab5d7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -64,7 +64,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
public void setUp() {
final Metrics metrics = new Metrics();
offset = 0;
- streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
"processId", new MockTime());
+ streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new
MockTime());
context = new
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new
TaskId(0, 0), TestUtils.tempDirectory());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 633d14c1e63..11f7e6c13cb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -54,7 +54,7 @@ public class TimestampedSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 75c01cef3c9..113de5959a4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -202,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifySumOfProperties(final String propertyName) throws
Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -219,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -236,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithSingleCache(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index a0c068b59ee..0436811db79 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -194,7 +194,7 @@ public class RocksDBMetricsRecorderTest {
assertThrows(
IllegalStateException.class,
() -> recorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
TASK_ID1
)
);
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 228df8d63a1..4ba7e565c6d 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -89,7 +89,7 @@ public class InternalMockProcessorContext<KOut, VOut>
this(null,
null,
null,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
null,
null,
@@ -106,7 +106,6 @@ public class InternalMockProcessorContext<KOut, VOut>
new StreamsMetricsImpl(
new Metrics(),
"mock",
- "processId",
new MockTime()
),
config,
@@ -139,7 +138,6 @@ public class InternalMockProcessorContext<KOut, VOut>
new StreamsMetricsImpl(
new Metrics(),
"mock",
- "processId",
new MockTime()
),
config,
@@ -157,7 +155,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
config,
null,
null,
@@ -177,7 +175,7 @@ public class InternalMockProcessorContext<KOut, VOut>
null,
serdes.keySerde(),
serdes.valueSerde(),
- new StreamsMetricsImpl(metrics, "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(metrics, "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
null,
@@ -194,7 +192,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
cache,
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2fc8400239d..dda783e03bd 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -382,7 +382,6 @@ public class TopologyTestDriver implements Closeable {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
"test-client",
- "processId",
mockWallClockTime
);
TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(),
streamsMetrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index fc7d27a3bb7..a9d4e30bcfc 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -237,7 +237,6 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
this.metrics = new StreamsMetricsImpl(
new Metrics(metricConfig),
threadId,
- "processId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 5a506163bb2..146359bf25e 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward>
implements ProcessorContex
metrics = new StreamsMetricsImpl(
new Metrics(metricConfig),
threadId,
- "processId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ebb38dd773a..72d125304f1 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -290,7 +290,6 @@ public class MockProcessorContextTest {
when(mockInternalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(
new Metrics(new MetricConfig()),
Thread.currentThread().getName(),
- "processId",
Time.SYSTEM
));
when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1,
1));