This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 16476107363 KAFKA-19882: Removing process id from default client level
tags (#20939)
16476107363 is described below
commit 164761073638963cb028f0ffa0d6256c9c3a816b
Author: Genseric Ghiro <[email protected]>
AuthorDate: Fri Nov 21 17:41:06 2025 -0500
KAFKA-19882: Removing process id from default client level tags (#20939)
## Summary
As a follow-up to #20906, also removing the process-id tag from 4.1.
This is because when working on
KIP-1091, we mistakenly applied the process-id tag to all client-level
metrics, rather than just the client-state, thread-state, and
recording-level metrics as specified in the KIP.
## Tests
Unit tests in `ClientMetricsTest.java` and `StreamsMetricsImplTest.java`
Reviewers: Matthias Sax <[email protected]>, Bill
Bejeck<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 5 +-
.../streams/internals/metrics/ClientMetrics.java | 10 +-
.../internals/metrics/StreamsMetricsImpl.java | 40 +++-
.../internals/metrics/ClientMetricsTest.java | 29 ++-
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../processor/internals/ActiveTaskCreatorTest.java | 2 +-
.../internals/DefaultStateUpdaterTest.java | 2 +-
.../internals/GlobalStreamThreadTest.java | 6 +-
.../processor/internals/MockStreamsMetrics.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 6 +-
.../processor/internals/RecordQueueTest.java | 2 +-
.../processor/internals/SourceNodeTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 8 +-
.../processor/internals/StreamThreadTest.java | 35 ++--
.../internals/metrics/StreamsMetricsImplTest.java | 219 ++++++++++++++-------
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 8 +-
.../AbstractRocksDBSegmentedBytesStoreTest.java | 8 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 2 +-
.../internals/GlobalStateStoreProviderTest.java | 2 +-
.../state/internals/KeyValueSegmentTest.java | 2 +-
.../state/internals/MeteredKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredSessionStoreTest.java | 2 +-
.../MeteredTimestampedKeyValueStoreTest.java | 2 +-
.../MeteredTimestampedWindowStoreTest.java | 4 +-
.../MeteredVersionedKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredWindowStoreTest.java | 2 +-
.../streams/state/internals/RocksDBStoreTest.java | 6 +-
.../RocksDBTimeOrderedKeyValueBufferTest.java | 2 +-
.../state/internals/TimestampedSegmentTest.java | 2 +-
.../metrics/RocksDBMetricsRecorderGaugesTest.java | 6 +-
.../metrics/RocksDBMetricsRecorderTest.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 10 +-
.../apache/kafka/streams/TopologyTestDriver.java | 1 -
.../streams/processor/MockProcessorContext.java | 1 -
.../processor/api/MockProcessorContext.java | 1 -
.../kafka/streams/MockProcessorContextTest.java | 1 -
37 files changed, 273 insertions(+), 167 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 656efc8bfe9..85356241af4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -980,7 +980,6 @@ public class KafkaStreams implements AutoCloseable {
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
- processId.toString(),
time
);
@@ -989,8 +988,8 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addApplicationIdMetric(streamsMetrics,
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics,
(metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state.name());
- ClientMetrics.addClientStateTelemetryMetric(streamsMetrics,
(metricsConfig, now) -> state.ordinal());
- ClientMetrics.addClientRecordingLevelMetric(streamsMetrics,
calculateMetricsRecordingLevel());
+ ClientMetrics.addClientStateTelemetryMetric(processId.toString(),
streamsMetrics, (metricsConfig, now) -> state.ordinal());
+ ClientMetrics.addClientRecordingLevelMetric(processId.toString(),
streamsMetrics, calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>());
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) -> numLiveStreamThreads());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
index 21bac269d5a..3b88e240172 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
@@ -25,9 +25,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
+import java.util.Collections;
import java.util.Properties;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
public class ClientMetrics {
@@ -126,21 +128,25 @@ public class ClientMetrics {
);
}
- public static void addClientStateTelemetryMetric(final StreamsMetricsImpl
streamsMetrics,
+ public static void addClientStateTelemetryMetric(final String processId,
+ final StreamsMetricsImpl
streamsMetrics,
final Gauge<Integer>
stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
CLIENT_STATE,
STATE_DESCRIPTION,
+ Collections.singletonMap(PROCESS_ID_TAG, processId),
RecordingLevel.INFO,
stateProvider
);
}
- public static void addClientRecordingLevelMetric(final StreamsMetricsImpl
streamsMetrics,
+ public static void addClientRecordingLevelMetric(final String processId,
+ final StreamsMetricsImpl
streamsMetrics,
final int recordingLevel)
{
streamsMetrics.addClientLevelImmutableMetric(
RECORDING_LEVEL,
RECORDING_LEVEL_DESCRIPTION,
+ Collections.singletonMap(PROCESS_ID_TAG, processId),
RecordingLevel.INFO,
recordingLevel
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index a0999a36c60..f652059f8e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -89,7 +89,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Metrics metrics;
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
- private final String processId;
private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -167,12 +166,10 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
- final String processId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
- this.processId = processId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new
RocksDBMetricsRecordingTrigger(time);
@@ -195,7 +192,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String description,
final RecordingLevel
recordingLevel,
final T value) {
- final MetricName metricName = metrics.metricName(name,
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+ addClientLevelImmutableMetric(name, description,
Collections.emptyMap(), recordingLevel, value);
+ }
+
+ public <T> void addClientLevelImmutableMetric(final String name,
+ final String description,
+ final Map<String, String>
additionalTags,
+ final RecordingLevel
recordingLevel,
+ final T value) {
+ final MetricName metricName = metrics.metricName(
+ name,
+ CLIENT_LEVEL_GROUP,
+ description,
+ clientLevelTagMap(additionalTags)
+ );
final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, new
ImmutableMetricValue<>(value));
@@ -207,7 +217,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String description,
final RecordingLevel
recordingLevel,
final Gauge<T> valueProvider) {
- final MetricName metricName = metrics.metricName(name,
CLIENT_LEVEL_GROUP, description, clientLevelTagMap());
+ addClientLevelMutableMetric(name, description, Collections.emptyMap(),
recordingLevel, valueProvider);
+ }
+
+ public <T> void addClientLevelMutableMetric(final String name,
+ final String description,
+ final Map<String, String>
additionalTags,
+ final RecordingLevel
recordingLevel,
+ final Gauge<T> valueProvider) {
+ final MetricName metricName = metrics.metricName(
+ name,
+ CLIENT_LEVEL_GROUP,
+ description,
+ clientLevelTagMap(additionalTags)
+ );
final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
synchronized (clientLevelMetrics) {
metrics.addMetric(metricName, metricConfig, valueProvider);
@@ -281,9 +304,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public Map<String, String> clientLevelTagMap() {
- final Map<String, String> tagMap = new LinkedHashMap<>();
+ return clientLevelTagMap(Collections.emptyMap());
+ }
+
+ public Map<String, String> clientLevelTagMap(final Map<String, String>
additionalTags) {
+ final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
tagMap.put(CLIENT_ID_TAG, clientId);
- tagMap.put(PROCESS_ID_TAG, processId);
return tagMap;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
index 9142835b92c..2eee10f7854 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
public class ClientMetricsTest {
private static final String COMMIT_ID = "test-commit-ID";
+ private static final String PROCESS_ID = "test-process-id";
private static final String VERSION = "test-version";
private final StreamsMetricsImpl streamsMetrics =
mock(StreamsMetricsImpl.class);
@@ -116,11 +117,15 @@ public class ClientMetricsTest {
final String name = "client-state";
final String description = "The state of the Kafka Streams client";
final Gauge<Integer> stateProvider = (config, now) ->
State.RUNNING.ordinal();
- setUpAndVerifyMutableMetric(
- name,
- description,
- stateProvider,
- () ->
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, stateProvider)
+
+ ClientMetrics.addClientStateTelemetryMetric(PROCESS_ID,
streamsMetrics, stateProvider);
+
+ verify(streamsMetrics).addClientLevelMutableMetric(
+ eq(name),
+ eq(description),
+ eq(Collections.singletonMap("process-id", PROCESS_ID)),
+ eq(RecordingLevel.INFO),
+ eq(stateProvider)
);
}
@@ -129,11 +134,15 @@ public class ClientMetricsTest {
final String name = "recording-level";
final String description = "The metrics recording level of the Kafka
Streams client";
final int recordingLevel = 1;
- setUpAndVerifyImmutableMetric(
- name,
- description,
- recordingLevel,
- () ->
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, recordingLevel)
+
+ ClientMetrics.addClientRecordingLevelMetric(PROCESS_ID,
streamsMetrics, recordingLevel);
+
+ verify(streamsMetrics).addClientLevelImmutableMetric(
+ eq(name),
+ eq(description),
+ eq(Collections.singletonMap("process-id", PROCESS_ID)),
+ eq(RecordingLevel.INFO),
+ eq(recordingLevel)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index dd3960c17ec..f6658997b7b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey,
value, aggregate) -> aggregate + 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 17eb27bb3d8..dc2e8ffd513 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index b6d41966257..8f767af93e0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
- private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", time);
+ private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", time);
private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata =
unnamedTopology().build();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index d964b0c80b0..838bb7ab502 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -132,7 +132,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
@@ -171,7 +171,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
@@ -420,7 +420,7 @@ public class GlobalStreamThreadTest {
consumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", time),
time,
"clientId",
stateRestoreListener,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index a2e6820f901..4ed68ef8150 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -23,6 +23,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
public class MockStreamsMetrics extends StreamsMetricsImpl {
public MockStreamsMetrics(final Metrics metrics) {
- super(metrics, "test", "processId", new MockTime());
+ super(metrics, "test", new MockTime());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index ce5fddb870a..3a139b3233c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -219,7 +219,7 @@ public class ProcessorNodeTest {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(),
Collections.emptySet());
@@ -303,7 +303,7 @@ public class ProcessorNodeTest {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(),
Collections.emptySet());
@@ -323,7 +323,7 @@ public class ProcessorNodeTest {
final InternalProcessorContext<Object, Object>
internalProcessorContext = mock(InternalProcessorContext.class,
withSettings().strictness(Strictness.LENIENT));
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
- when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
+ when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 1dd19fb9cf7..1becc3d240d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -72,7 +72,7 @@ public class RecordQueueTest {
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
+ new StreamsMetricsImpl(metrics, "mock", new MockTime());
final InternalMockProcessorContext<Integer, Integer> context = new
InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index a509d14c974..817ffe42f84 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -97,7 +97,7 @@ public class SourceNodeTest {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", new MockTime());
final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new
TheDeserializer(), new TheDeserializer());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6be27187f0f..9680dd1af1d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, time);
private File baseDir;
private StreamsConfig config;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index fda9afa9a88..68ce72ff45c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2604,7 +2604,7 @@ public class StreamTaskTest {
streamsMetrics,
null
);
- final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", "processId", time);
+ final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", time);
// The processor topology is missing the topics
final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -3238,7 +3238,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
@@ -3275,7 +3275,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
@@ -3311,7 +3311,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", time),
stateDirectory,
cache,
time,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 740d7d2c8f9..968a6c1f34f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -320,7 +320,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
time
);
@@ -728,7 +727,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -793,7 +791,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -1158,7 +1155,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
@@ -1433,7 +1430,7 @@ public class StreamThreadTest {
}
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final Properties props = configProps(false, stateUpdaterEnabled,
processingThreadsEnabled);
final StreamsConfig config = new StreamsConfig(props);
@@ -1899,7 +1896,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -2655,7 +2651,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2716,7 +2712,7 @@ public class StreamThreadTest {
doThrow(new
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2786,7 +2782,7 @@ public class StreamThreadTest {
doNothing().when(taskManager).handleLostAll();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2852,7 +2848,7 @@ public class StreamThreadTest {
doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2915,7 +2911,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3149,7 +3145,7 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3208,7 +3204,7 @@ public class StreamThreadTest {
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3598,7 +3594,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -3657,7 +3652,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -3724,7 +3718,6 @@ public class StreamThreadTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
APPLICATION_ID,
- PROCESS_ID.toString(),
mockTime
);
@@ -3820,7 +3813,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -3879,7 +3872,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -3944,7 +3937,7 @@ public class StreamThreadTest {
"",
taskManager,
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime),
topologyMetadata,
PROCESS_ID,
"thread-id",
@@ -3997,7 +3990,7 @@ public class StreamThreadTest {
final LogContext logContext = new LogContext("test");
final Logger log = logContext.logger(StreamThreadTest.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
new TopologyMetadata(internalTopologyBuilder, config),
config,
@@ -4056,7 +4049,7 @@ public class StreamThreadTest {
final StreamsConfig config,
final TopologyMetadata
topologyMetadata) {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
return new StreamThread(
mockTime,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bdb9d028c54..63496800acb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue;
import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.test.StreamsTestUtils;
@@ -54,7 +53,6 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
-import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
@@ -131,13 +129,13 @@ public class StreamsMetricsImplTest {
private final String metricNamePrefix = "metric";
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
- private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID));
+ private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID));
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1,
clientLevelTags);
private final MetricName metricName2 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2,
clientLevelTags);
private final MockTime time = new MockTime(0);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
private static MetricConfig eqMetricConfig(final MetricConfig
metricConfig) {
final StringBuffer message = new StringBuffer();
@@ -251,7 +249,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -263,7 +261,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -275,7 +273,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -292,7 +290,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -309,7 +307,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -328,7 +326,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -347,7 +345,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final ArgumentCaptor<String> sensorKeys =
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -365,7 +363,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -381,7 +379,7 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2,
INFO_RECORDING_LEVEL);
@@ -393,7 +391,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -405,7 +403,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -417,7 +415,7 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds()
throws InterruptedException {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
final Thread otherThread =
@@ -432,7 +430,7 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -456,7 +454,7 @@ public class StreamsMetricsImplTest {
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
when(metrics.addMetricIfAbsent(eq(metricName),
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -489,7 +487,7 @@ public class StreamsMetricsImplTest {
when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -539,7 +537,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final MetricName metricName1 =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricName metricName2 =
@@ -562,7 +560,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -580,7 +578,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -599,7 +597,7 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1,
@@ -618,7 +616,7 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1, TASK_ID1,
@@ -635,7 +633,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -647,7 +645,7 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -656,31 +654,110 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddClientLevelImmutableMetric() {
- final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
- final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
final String value = "immutable-value";
- final ImmutableMetricValue immutableValue = new
ImmutableMetricValue<>(value);
- when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
- .thenReturn(metricName1);
- doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(immutableValue));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
- streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1,
DESCRIPTION1, recordingLevel, value);
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelImmutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ recordingLevel,
+ value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID)
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
+ }
+
+ @Test
+ public void shouldAddClientLevelImmutableMetricWithAdditionalTags() {
+ final RecordingLevel recordingLevel = RecordingLevel.INFO;
+ final String value = "immutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelImmutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ Collections.singletonMap("additional-tag", "additional-value"),
+ recordingLevel,
+ value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID),
+ mkEntry("additional-tag", "additional-value")
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
}
@Test
public void shouldAddClientLevelMutableMetric() {
- final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
- final MetricConfig metricConfig = new
MetricConfig().recordLevel(recordingLevel);
- final Gauge<String> valueProvider = (config, now) -> "mutable-value";
- when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
- .thenReturn(metricName1);
- doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(valueProvider));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final String value = "mutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
- streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1,
recordingLevel, valueProvider);
+ streamsMetrics.addClientLevelMutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ recordingLevel,
+ (c, t) -> value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID)
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
+ }
+
+ @Test
+ public void shouldAddClientLevelMutableMetricWithAdditionalTags() {
+ final RecordingLevel recordingLevel = RecordingLevel.INFO;
+ final String value = "mutable-value";
+
+ final StreamsMetricsImpl streamsMetrics
+ = new StreamsMetricsImpl(metrics, CLIENT_ID, time);
+
+ streamsMetrics.addClientLevelMutableMetric(
+ METRIC_NAME1,
+ DESCRIPTION1,
+ Collections.singletonMap("additional-tag", "additional-value"),
+ recordingLevel,
+ (c, t) -> value
+ );
+
+ final MetricName name = metrics.metricName(
+ METRIC_NAME1,
+ CLIENT_LEVEL_GROUP,
+ mkMap(
+ mkEntry("client-id", CLIENT_ID),
+ mkEntry("additional-tag", "additional-value")
+ )
+ );
+ assertThat(metrics.metric(name).metricName().name(),
equalTo(METRIC_NAME1));
+ assertThat(metrics.metric(name).metricValue(), equalTo(value));
}
@Test
@@ -699,7 +776,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveClientLevelMetricsAndSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
final ArgumentCaptor<String> sensorKeys =
addSensorsOnAllLevels(metrics, streamsMetrics);
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +789,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveThreadLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
addSensorsOnAllLevels(metrics, streamsMetrics);
setupRemoveSensorsTest(metrics, THREAD_ID1);
@@ -721,7 +798,7 @@ public class StreamsMetricsImplTest {
@Test
public void testNullMetrics() {
- assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", PROCESS_ID, time));
+ assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", time));
}
@Test
@@ -754,7 +831,7 @@ public class StreamsMetricsImplTest {
@Test
public void testMultiLevelSensorRemoval() {
final Metrics registry = new Metrics();
- final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, time);
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}
@@ -860,7 +937,7 @@ public class StreamsMetricsImplTest {
final MockTime time = new MockTime(1);
final MetricConfig config = new MetricConfig().timeWindow(1,
TimeUnit.MILLISECONDS);
final Metrics metrics = new Metrics(config, time);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", time);
final String scope = "scope";
final String entity = "entity";
@@ -894,7 +971,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddLatencyRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
shouldAddCustomSensor(
streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -909,7 +986,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, time);
shouldAddCustomSensor(
streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -1035,9 +1112,8 @@ public class StreamsMetricsImplTest {
public void shouldGetClientLevelTagMap() {
final Map<String, String> tagMap = streamsMetrics.clientLevelTagMap();
- assertThat(tagMap.size(), equalTo(2));
+ assertThat(tagMap.size(), equalTo(1));
assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG),
equalTo(CLIENT_ID));
- assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG),
equalTo(PROCESS_ID));
}
@Test
@@ -1045,7 +1121,7 @@ public class StreamsMetricsImplTest {
final String taskName = "test-task";
final String storeType = "remote-window";
final String storeName = "window-keeper";
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, time);
final Map<String, String> tagMap =
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
@@ -1060,7 +1136,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetCacheLevelTagMap() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ new StreamsMetricsImpl(metrics, THREAD_ID1, time);
final String taskName = "taskName";
final String storeName = "storeName";
@@ -1077,7 +1153,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetThreadLevelTagMap() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, time);
final Map<String, String> tagMap =
streamsMetrics.threadLevelTagMap(THREAD_ID1);
@@ -1210,7 +1286,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
time).version(),
+ new StreamsMetricsImpl(metrics, THREAD_ID1, time).version(),
equalTo(Version.LATEST)
);
}
@@ -1268,58 +1344,61 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddThreadLevelMutableMetric() {
final int measuredValue = 123;
+ final String name = "foobar";
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
- "foobar",
+ name,
"test metric",
"t1",
(c, t) -> measuredValue
);
- final MetricName name = metrics.metricName(
- "foobar",
+ final MetricName metricName = metrics.metricName(
+ name,
THREAD_LEVEL_GROUP,
mkMap(
mkEntry("thread-id", "t1")
)
);
- assertThat(metrics.metric(name), notNullValue());
- assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+
+ assertThat(metrics.metric(metricName).metricName().name(),
equalTo(name));
+ assertThat(metrics.metric(metricName).metricValue(),
equalTo(measuredValue));
}
@Test
public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
final int measuredValue = 123;
+ final String name = "foobar";
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
- "foobar",
+ name,
"test metric",
"t1",
Collections.singletonMap("additional-tag", "additional-value"),
(c, t) -> measuredValue
);
- final MetricName name = metrics.metricName(
- "foobar",
+ final MetricName metricName = metrics.metricName(
+ name,
THREAD_LEVEL_GROUP,
mkMap(
mkEntry("thread-id", "t1"),
mkEntry("additional-tag", "additional-value")
)
);
- assertThat(metrics.metric(name), notNullValue());
- assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
+ assertThat(metrics.metric(metricName).metricName().name(),
equalTo(name));
+ assertThat(metrics.metric(metricName).metricValue(),
equalTo(measuredValue));
}
@Test
public void shouldCleanupThreadLevelMutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
"test metric",
@@ -1341,7 +1420,7 @@ public class StreamsMetricsImplTest {
public void shouldAddThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
@@ -1365,7 +1444,7 @@ public class StreamsMetricsImplTest {
public void shouldCleanupThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
"test metric",
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0f8ce890b19..fd5bbef428a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1350,7 +1350,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1386,7 +1386,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1425,7 +1425,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1466,7 +1466,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7a78716530a..ede32caba90 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -572,7 +572,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -612,7 +612,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -654,7 +654,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -698,7 +698,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 154517d3b94..02de04a5247 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
streamsConfig,
() -> collector,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 4239e3e5000..0480fbc0721 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -112,7 +112,7 @@ public class GlobalStateStoreProviderTest {
when(mockContext.applicationId()).thenReturn("appId");
when(mockContext.metrics())
.thenReturn(
- new StreamsMetricsImpl(new Metrics(), "threadName",
"processId", new MockTime())
+ new StreamsMetricsImpl(new Metrics(), "threadName", new
MockTime())
);
when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index e71704f32af..fa0757ac62f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -54,7 +54,7 @@ public class KeyValueSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 294af3944f2..1a85901ccc4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -123,7 +123,7 @@ public class MeteredKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics()).thenReturn(
- new StreamsMetricsImpl(metrics, "test", "processId", mockTime)
+ new StreamsMetricsImpl(metrics, "test", mockTime)
);
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index ee1b686dade..e34c232075e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -126,7 +126,7 @@ public class MeteredSessionStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test",
"processId", mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(innerStore.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 2e3c470387c..9b5d33db966 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -127,7 +127,7 @@ public class MeteredTimestampedKeyValueStoreTest {
setUpWithoutContext();
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId",
mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(inner.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index a3fe59c6e8b..6a4871bd3a3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
@@ -105,7 +105,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUpWithoutContextName() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 8e8e02b2722..d40f6947480 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest {
@BeforeEach
public void setUp() {
when(inner.name()).thenReturn(STORE_NAME);
- when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", "processId", mockTime));
+ when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", mockTime));
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.taskId()).thenReturn(TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ba557104ebd..d0efdb8f4b7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredWindowStoreTest {
@BeforeEach
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 8a02289890e..7dd4caf24d2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -923,7 +923,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -956,7 +956,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -988,7 +988,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", time);
final Properties props = StreamsTestUtils.getStreamsConfig();
context = mock(InternalMockProcessorContext.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 69e7d31e31b..e288c04517e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
public void setUp() {
final Metrics metrics = new Metrics();
offset = 0;
- streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
"processId", new MockTime());
+ streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", new
MockTime());
context = new
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new
TaskId(0, 0), TestUtils.tempDirectory());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 633d14c1e63..11f7e6c13cb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -54,7 +54,7 @@ public class TimestampedSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 75c01cef3c9..113de5959a4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -202,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifySumOfProperties(final String propertyName) throws
Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -219,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -236,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithSingleCache(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index a0c068b59ee..0436811db79 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -194,7 +194,7 @@ public class RocksDBMetricsRecorderTest {
assertThrows(
IllegalStateException.class,
() -> recorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
TASK_ID1
)
);
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed68c86c490..219cc7a80fc 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -90,7 +90,7 @@ public class InternalMockProcessorContext<KOut, VOut>
this(null,
null,
null,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
null,
null,
@@ -107,7 +107,6 @@ public class InternalMockProcessorContext<KOut, VOut>
new StreamsMetricsImpl(
new Metrics(),
"mock",
- "processId",
new MockTime()
),
config,
@@ -140,7 +139,6 @@ public class InternalMockProcessorContext<KOut, VOut>
new StreamsMetricsImpl(
new Metrics(),
"mock",
- "processId",
new MockTime()
),
config,
@@ -158,7 +156,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
config,
null,
null,
@@ -178,7 +176,7 @@ public class InternalMockProcessorContext<KOut, VOut>
null,
serdes.keySerde(),
serdes.valueSerde(),
- new StreamsMetricsImpl(metrics, "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(metrics, "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
null,
@@ -195,7 +193,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
cache,
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 81c90d043ce..a1cd78d6fc0 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -384,7 +384,6 @@ public class TopologyTestDriver implements Closeable {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
"test-client",
- "processId",
mockWallClockTime
);
TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(),
streamsMetrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index fc7d27a3bb7..a9d4e30bcfc 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -237,7 +237,6 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
this.metrics = new StreamsMetricsImpl(
new Metrics(metricConfig),
threadId,
- "processId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 52a2308dafe..7d366cddfb8 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -255,7 +255,6 @@ public class MockProcessorContext<KForward, VForward>
implements ProcessorContex
metrics = new StreamsMetricsImpl(
new Metrics(metricConfig),
threadId,
- "processId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ebb38dd773a..72d125304f1 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -290,7 +290,6 @@ public class MockProcessorContextTest {
when(mockInternalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(
new Metrics(new MetricConfig()),
Thread.currentThread().getName(),
- "processId",
Time.SYSTEM
));
when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1,
1));