This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b065c60f496 KAFKA-19734 Adding application tag to clientstate metric
(#20766)
b065c60f496 is described below
commit b065c60f49629aaccaab0b3bf38c415c35685ef3
Author: Genseric Ghiro <[email protected]>
AuthorDate: Mon Oct 27 15:58:47 2025 -0400
KAFKA-19734 Adding application tag to clientstate metric (#20766)
## Summary
As a follow-on to the improvements introduced in
[KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics)
it would be useful to add the application-id as a tag to the
`client-state` metric. This allows Kafka Streams developers and
operators to connect metrics containing a `thread-id` (which embeds the
`application-id`) across separate deployments of Kafka Streams
instances, which are members of the same logical application.
Reviewers: Bill Bejeck<[email protected]> Matthias
Sax<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 1 +
.../internals/metrics/StreamsMetricsImpl.java | 5 +
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../processor/internals/ActiveTaskCreatorTest.java | 2 +-
.../internals/DefaultStateUpdaterTest.java | 2 +-
.../internals/GlobalStreamThreadTest.java | 6 +-
.../processor/internals/MockStreamsMetrics.java | 2 +-
.../processor/internals/ProcessorNodeTest.java | 6 +-
.../processor/internals/RecordQueueTest.java | 2 +-
.../processor/internals/SourceNodeTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StateDirectoryTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 8 +-
.../processor/internals/StreamThreadTest.java | 43 ++++---
.../internals/metrics/StreamsMetricsImplTest.java | 129 ++++++++++++++-------
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 8 +-
.../AbstractRocksDBSegmentedBytesStoreTest.java | 8 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 2 +-
.../internals/GlobalStateStoreProviderTest.java | 2 +-
.../state/internals/KeyValueSegmentTest.java | 2 +-
.../state/internals/MeteredKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredSessionStoreTest.java | 2 +-
.../MeteredTimestampedKeyValueStoreTest.java | 2 +-
.../MeteredTimestampedWindowStoreTest.java | 4 +-
.../MeteredVersionedKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredWindowStoreTest.java | 2 +-
.../streams/state/internals/RocksDBStoreTest.java | 6 +-
.../RocksDBTimeOrderedKeyValueBufferTest.java | 2 +-
.../state/internals/TimestampedSegmentTest.java | 2 +-
.../metrics/RocksDBMetricsRecorderGaugesTest.java | 6 +-
.../metrics/RocksDBMetricsRecorderTest.java | 2 +-
.../kafka/test/InternalMockProcessorContext.java | 10 +-
.../apache/kafka/streams/TopologyTestDriver.java | 1 +
.../streams/processor/MockProcessorContext.java | 1 +
.../processor/api/MockProcessorContext.java | 1 +
.../kafka/streams/MockProcessorContextTest.java | 1 +
36 files changed, 172 insertions(+), 110 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 69012f0c313..40b6fad5a3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -983,6 +983,7 @@ public class KafkaStreams implements AutoCloseable {
metrics,
clientId,
processId.toString(),
+ applicationId,
time
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index a0999a36c60..c56f079b064 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -90,6 +90,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Map<Sensor, Sensor> parentSensors;
private final String clientId;
private final String processId;
+ private final String applicationId;
private final Version version;
private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
@@ -118,6 +119,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String CLIENT_ID_TAG = "client-id";
public static final String PROCESS_ID_TAG = "process-id";
+ public static final String APPLICATION_ID_TAG = "application-id";
public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@@ -168,11 +170,13 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
public StreamsMetricsImpl(final Metrics metrics,
final String clientId,
final String processId,
+ final String applicationId,
final Time time) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.metrics = metrics;
this.clientId = clientId;
this.processId = processId;
+ this.applicationId = applicationId;
version = Version.LATEST;
rocksDBMetricsRecordingTrigger = new
RocksDBMetricsRecordingTrigger(time);
@@ -284,6 +288,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(CLIENT_ID_TAG, clientId);
tagMap.put(PROCESS_ID_TAG, processId);
+ tagMap.put(APPLICATION_ID_TAG, applicationId);
return tagMap;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index dd3960c17ec..c2aaf7f7373 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -80,7 +80,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "test", "processId", "applicationId", time);
private final String threadId = Thread.currentThread().getName();
private final Initializer<Long> initializer = () -> 0L;
private final Aggregator<String, String, Long> aggregator = (aggKey,
value, aggregate) -> aggregate + 1;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 362c32592ca..2d8375939d6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -72,7 +72,7 @@ public class ActiveTaskCreatorTest {
private ChangelogReader changeLogReader;
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime());
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", "processId", "applicationId", new
MockTime());
private final Map<String, Object> properties = mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index b6d41966257..cd10f2a7703 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -106,7 +106,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
- private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", time);
+ private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", "", time);
private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata =
unnamedTopology().build();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index ff428828e05..6ec219a8cd9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -132,7 +132,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", time),
time,
"clientId",
stateRestoreListener,
@@ -169,7 +169,7 @@ public class GlobalStreamThreadTest {
mockConsumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
time),
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", time),
time,
"clientId",
stateRestoreListener,
@@ -418,7 +418,7 @@ public class GlobalStreamThreadTest {
consumer,
new StateDirectory(config, time, true, false),
0,
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", time),
+ new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", "applicationId", time),
time,
"clientId",
stateRestoreListener,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index a2e6820f901..54238d8bd6b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -23,6 +23,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
public class MockStreamsMetrics extends StreamsMetricsImpl {
public MockStreamsMetrics(final Metrics metrics) {
- super(metrics, "test", "processId", new MockTime());
+ super(metrics, "test", "processId", "applicationId", new MockTime());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 6b7c06580c0..ccd47e60d9f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -279,7 +279,7 @@ public class ProcessorNodeTest {
public void testMetricsWithBuiltInMetricsVersionLatest() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", "processId",
"applicationId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new NoOpProcessor(),
Collections.emptySet());
@@ -363,7 +363,7 @@ public class ProcessorNodeTest {
public void testTopologyLevelClassCastExceptionDirect() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", "processId",
"applicationId", new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("pname", new ClassCastProcessor(),
Collections.emptySet());
@@ -441,7 +441,7 @@ public class ProcessorNodeTest {
final InternalProcessorContext<Object, Object>
internalProcessorContext = mock(InternalProcessorContext.class,
withSettings().strictness(Strictness.LENIENT));
when(internalProcessorContext.taskId()).thenReturn(TASK_ID);
- when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", "processId", new MockTime()));
+ when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", "processId", "applicationId",
new MockTime()));
when(internalProcessorContext.topic()).thenReturn(TOPIC);
when(internalProcessorContext.partition()).thenReturn(PARTITION);
when(internalProcessorContext.offset()).thenReturn(OFFSET);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 1dd19fb9cf7..9cd4e5fbc63 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -72,7 +72,7 @@ public class RecordQueueTest {
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());
+ new StreamsMetricsImpl(metrics, "mock", "processId", "applicationId",
new MockTime());
final InternalMockProcessorContext<Integer, Integer> context = new
InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index a509d14c974..d10b2800dc9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -97,7 +97,7 @@ public class SourceNodeTest {
public void shouldExposeProcessMetrics() {
final Metrics metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-client", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test-client", "processId",
"applicationId", new MockTime());
final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(streamsMetrics);
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new
TheDeserializer(), new TheDeserializer());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 6be27187f0f..df7f83d05fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -113,7 +113,7 @@ public class StandbyTaskTest {
private final MockTime time = new MockTime();
private final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, "processId", time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, threadName, "processId", "applicationId", time);
private File baseDir;
private StreamsConfig config;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 5447b6f3976..a303e827a38 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -987,7 +987,7 @@ public class StateDirectoryTest {
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
- directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new
Metrics(), "test", "processId", time), new LogContext("test"));
+ directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new
Metrics(), "test", "processId", "applicationId", time), new LogContext("test"));
return store;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 385719530b9..71fd00c77ce 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2604,7 +2604,7 @@ public class StreamTaskTest {
streamsMetrics,
null
);
- final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", "processId", time);
+ final StreamsMetricsImpl metrics = new
StreamsMetricsImpl(this.metrics, "test", "processId", "applicationId", time);
// The processor topology is missing the topics
final ProcessorTopology topology = withSources(emptyList(), mkMap());
@@ -3238,7 +3238,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", time),
stateDirectory,
cache,
time,
@@ -3275,7 +3275,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", time),
stateDirectory,
cache,
time,
@@ -3311,7 +3311,7 @@ public class StreamTaskTest {
topology,
consumer,
new TopologyConfig(null, config, new
Properties()).getTaskConfig(),
- new StreamsMetricsImpl(metrics, "test", "processId", time),
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", time),
stateDirectory,
cache,
time,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 66b90ffcc03..99453a8d2a0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -325,6 +325,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
time
);
@@ -766,6 +767,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -831,6 +833,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -1196,7 +1199,7 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(configProps(false,
stateUpdaterEnabled, processingThreadsEnabled));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
stateDirectory = new StateDirectory(config, mockTime, true, false);
@@ -1473,7 +1476,7 @@ public class StreamThreadTest {
}
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final Properties props = configProps(false, stateUpdaterEnabled,
processingThreadsEnabled);
final StreamsConfig config = new StreamsConfig(props);
@@ -1945,6 +1948,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -2705,7 +2709,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(true);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2766,7 +2770,7 @@ public class StreamThreadTest {
doThrow(new
TimeoutException()).when(taskManager).handleCorruption(corruptedTasks);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2835,7 +2839,7 @@ public class StreamThreadTest {
doNothing().when(taskManager).handleLostAll();
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2901,7 +2905,7 @@ public class StreamThreadTest {
doNothing().when(consumer).enforceRebalance("Active tasks corrupted");
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -2964,7 +2968,7 @@ public class StreamThreadTest {
when(taskManager.handleCorruption(corruptedTasks)).thenReturn(false);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3199,7 +3203,7 @@ public class StreamThreadTest {
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3258,7 +3262,7 @@ public class StreamThreadTest {
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
final TaskManager taskManager = mock(TaskManager.class);
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
thread = new StreamThread(
@@ -3660,6 +3664,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -3720,6 +3725,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -3787,6 +3793,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
PROCESS_ID.toString(),
+ APPLICATION_ID,
mockTime
);
@@ -3882,7 +3889,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -3942,7 +3949,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -4011,7 +4018,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -4071,7 +4078,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -4131,7 +4138,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -4200,7 +4207,7 @@ public class StreamThreadTest {
null,
mock(TaskManager.class),
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), APPLICATION_ID, mockTime),
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
@@ -4296,7 +4303,7 @@ public class StreamThreadTest {
"",
taskManager,
null,
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime),
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime),
topologyMetadata,
PROCESS_ID,
"thread-id",
@@ -4348,7 +4355,7 @@ public class StreamThreadTest {
private Collection<Task> createStandbyTask(final StreamsConfig config) {
final LogContext logContext = new LogContext("test");
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
new TopologyMetadata(internalTopologyBuilder, config),
config,
@@ -4407,7 +4414,7 @@ public class StreamThreadTest {
final StreamsConfig config,
final TopologyMetadata
topologyMetadata) {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
mockTime);
+ new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(),
APPLICATION_ID, mockTime);
return new StreamThread(
mockTime,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index bdb9d028c54..7070bb4df18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.APPLICATION_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_ID_TAG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
@@ -95,6 +96,7 @@ public class StreamsMetricsImplTest {
private static final String INTERNAL_PREFIX = "internal";
private static final String CLIENT_ID = "test-client";
private static final String PROCESS_ID = "test-process";
+ private static final String APPLICATION_ID = "test-app";
private static final String THREAD_ID1 = "test-thread-1";
private static final String TASK_ID1 = "test-task-1";
private static final String TASK_ID2 = "test-task-2";
@@ -131,13 +133,14 @@ public class StreamsMetricsImplTest {
private final String metricNamePrefix = "metric";
private final String group = "group";
private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
- private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID), mkEntry(PROCESS_ID_TAG, PROCESS_ID));
+ private final Map<String, String> clientLevelTags =
mkMap(mkEntry(CLIENT_ID_TAG, CLIENT_ID),
+ mkEntry(PROCESS_ID_TAG, PROCESS_ID), mkEntry(APPLICATION_ID_TAG,
APPLICATION_ID));
private final MetricName metricName1 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1,
clientLevelTags);
private final MetricName metricName2 =
new MetricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION2,
clientLevelTags);
private final MockTime time = new MockTime(0);
- private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID, time);
private static MetricConfig eqMetricConfig(final MetricConfig
metricConfig) {
final StringBuffer message = new StringBuffer();
@@ -251,7 +254,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -263,7 +267,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor =
streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
@@ -275,7 +280,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -292,7 +298,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID1,
@@ -309,7 +316,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -328,7 +336,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
@@ -347,7 +356,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final ArgumentCaptor<String> sensorKeys =
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -365,7 +375,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
TASK_ID1,
@@ -381,7 +392,8 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2,
INFO_RECORDING_LEVEL);
@@ -393,7 +405,8 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -405,7 +418,8 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -417,7 +431,8 @@ public class StreamsMetricsImplTest {
public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds()
throws InterruptedException {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
final Thread otherThread =
@@ -432,7 +447,8 @@ public class StreamsMetricsImplTest {
public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
final Metrics metrics = mock(Metrics.class);
final ArgumentCaptor<String> sensorKeys = setUpSensorKeyTests(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1,
INFO_RECORDING_LEVEL);
@@ -456,7 +472,8 @@ public class StreamsMetricsImplTest {
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
when(metrics.addMetricIfAbsent(eq(metricName),
eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -489,7 +506,8 @@ public class StreamsMetricsImplTest {
when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP))
.thenReturn(metricName);
when(metrics.metric(metricName)).thenReturn(null);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.addStoreLevelMutableMetric(
TASK_ID1,
@@ -539,7 +557,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveStateStoreLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final MetricName metricName1 =
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP,
DESCRIPTION1, STORE_LEVEL_TAG_MAP);
final MetricName metricName2 =
@@ -562,7 +581,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -580,7 +600,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1,
@@ -599,7 +620,8 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1,
@@ -618,7 +640,8 @@ public class StreamsMetricsImplTest {
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID1, TASK_ID1,
@@ -635,7 +658,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -647,7 +671,8 @@ public class StreamsMetricsImplTest {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final Sensor actualSensor =
streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
@@ -664,7 +689,8 @@ public class StreamsMetricsImplTest {
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
.thenReturn(metricName1);
doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(immutableValue));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1,
DESCRIPTION1, recordingLevel, value);
}
@@ -678,7 +704,8 @@ public class StreamsMetricsImplTest {
when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP,
DESCRIPTION1, clientLevelTags))
.thenReturn(metricName1);
doNothing().when(metrics).addMetric(eq(metricName1),
eqMetricConfig(metricConfig), eq(valueProvider));
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1,
recordingLevel, valueProvider);
}
@@ -699,7 +726,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveClientLevelMetricsAndSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
final ArgumentCaptor<String> sensorKeys =
addSensorsOnAllLevels(metrics, streamsMetrics);
doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0));
@@ -712,7 +740,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldRemoveThreadLevelSensors() {
final Metrics metrics = mock(Metrics.class);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
addSensorsOnAllLevels(metrics, streamsMetrics);
setupRemoveSensorsTest(metrics, THREAD_ID1);
@@ -721,7 +750,8 @@ public class StreamsMetricsImplTest {
@Test
public void testNullMetrics() {
- assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", PROCESS_ID, time));
+ assertThrows(NullPointerException.class, () -> new
StreamsMetricsImpl(null, "", PROCESS_ID,
+ APPLICATION_ID, time));
}
@Test
@@ -754,7 +784,8 @@ public class StreamsMetricsImplTest {
@Test
public void testMultiLevelSensorRemoval() {
final Metrics registry = new Metrics();
- final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry,
THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+ time);
for (final MetricName defaultMetric : registry.metrics().keySet()) {
registry.removeMetric(defaultMetric);
}
@@ -860,7 +891,8 @@ public class StreamsMetricsImplTest {
final MockTime time = new MockTime(1);
final MetricConfig config = new MetricConfig().timeWindow(1,
TimeUnit.MILLISECONDS);
final Metrics metrics = new Metrics(config, time);
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, "", PROCESS_ID, APPLICATION_ID,
+ time);
final String scope = "scope";
final String entity = "entity";
@@ -894,7 +926,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddLatencyRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
shouldAddCustomSensor(
streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -909,7 +942,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldAddRateTotalSensor() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, APPLICATION_ID,
+ time);
shouldAddCustomSensor(
streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME,
OPERATION_NAME, RecordingLevel.DEBUG),
streamsMetrics,
@@ -1035,9 +1069,10 @@ public class StreamsMetricsImplTest {
public void shouldGetClientLevelTagMap() {
final Map<String, String> tagMap = streamsMetrics.clientLevelTagMap();
- assertThat(tagMap.size(), equalTo(2));
+ assertThat(tagMap.size(), equalTo(3));
assertThat(tagMap.get(StreamsMetricsImpl.CLIENT_ID_TAG),
equalTo(CLIENT_ID));
assertThat(tagMap.get(StreamsMetricsImpl.PROCESS_ID_TAG),
equalTo(PROCESS_ID));
+ assertThat(tagMap.get(StreamsMetricsImpl.APPLICATION_ID_TAG),
equalTo(APPLICATION_ID));
}
@Test
@@ -1045,7 +1080,8 @@ public class StreamsMetricsImplTest {
final String taskName = "test-task";
final String storeType = "remote-window";
final String storeName = "window-keeper";
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+ time);
final Map<String, String> tagMap =
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
@@ -1060,7 +1096,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetCacheLevelTagMap() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
final String taskName = "taskName";
final String storeName = "storeName";
@@ -1077,7 +1114,8 @@ public class StreamsMetricsImplTest {
@Test
public void shouldGetThreadLevelTagMap() {
- final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, APPLICATION_ID,
+ time);
final Map<String, String> tagMap =
streamsMetrics.threadLevelTagMap(THREAD_ID1);
@@ -1210,7 +1248,7 @@ public class StreamsMetricsImplTest {
@Test
public void shouldReturnMetricsVersionCurrent() {
assertThat(
- new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
time).version(),
+ new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID, time).version(),
equalTo(Version.LATEST)
);
}
@@ -1269,7 +1307,8 @@ public class StreamsMetricsImplTest {
public void shouldAddThreadLevelMutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
@@ -1293,7 +1332,8 @@ public class StreamsMetricsImplTest {
public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
@@ -1319,7 +1359,8 @@ public class StreamsMetricsImplTest {
public void shouldCleanupThreadLevelMutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
"test metric",
@@ -1341,7 +1382,8 @@ public class StreamsMetricsImplTest {
public void shouldAddThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
@@ -1365,7 +1407,8 @@ public class StreamsMetricsImplTest {
public void shouldCleanupThreadLevelImmutableMetric() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
- = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
+ = new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID,
APPLICATION_ID,
+ time);
streamsMetrics.addThreadLevelImmutableMetric(
"foobar",
"test metric",
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0f8ce890b19..902b362e153 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1350,7 +1350,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1386,7 +1386,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1425,7 +1425,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -1466,7 +1466,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7a78716530a..77a5d9e8710 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -572,7 +572,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -612,7 +612,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -654,7 +654,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
@@ -698,7 +698,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
dir,
Serdes.String(),
Serdes.String(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(props),
MockRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 154517d3b94..e09a8ebe57b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
streamsConfig,
() -> collector,
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 4239e3e5000..573cc02a438 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -112,7 +112,7 @@ public class GlobalStateStoreProviderTest {
when(mockContext.applicationId()).thenReturn("appId");
when(mockContext.metrics())
.thenReturn(
- new StreamsMetricsImpl(new Metrics(), "threadName",
"processId", new MockTime())
+ new StreamsMetricsImpl(new Metrics(), "threadName",
"processId", "applicationId", new MockTime())
);
when(mockContext.taskId()).thenReturn(new TaskId(0, 0));
when(mockContext.appConfigs()).thenReturn(CONFIGS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index e71704f32af..27aa52b48c6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -54,7 +54,7 @@ public class KeyValueSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", new MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 1ba655a75ce..0ecea105bad 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -122,7 +122,7 @@ public class MeteredKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics()).thenReturn(
- new StreamsMetricsImpl(metrics, "test", "processId", mockTime)
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", mockTime)
);
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f8b08a532d1..4546e6b7c1f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -126,7 +126,7 @@ public class MeteredSessionStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test",
"processId", mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test",
"processId", "applicationId", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(innerStore.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 2e3c470387c..0b6556ac78b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -127,7 +127,7 @@ public class MeteredTimestampedKeyValueStoreTest {
setUpWithoutContext();
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.metrics())
- .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId",
mockTime));
+ .thenReturn(new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", mockTime));
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(inner.name()).thenReturn(STORE_NAME);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index a3fe59c6e8b..a3ed1453454 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -77,7 +77,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
@@ -105,7 +105,7 @@ public class MeteredTimestampedWindowStoreTest {
public void setUpWithoutContextName() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 5c4509bc7a3..1602f1a115b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -110,7 +110,7 @@ public class MeteredVersionedKeyValueStoreTest {
@BeforeEach
public void setUp() {
when(inner.name()).thenReturn(STORE_NAME);
- when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", "processId", mockTime));
+ when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics,
"test", "processId", "applicationId", mockTime));
when(context.applicationId()).thenReturn(APPLICATION_ID);
when(context.taskId()).thenReturn(TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 2726ce26aa7..40e4e52eafa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -116,7 +116,7 @@ public class MeteredWindowStoreTest {
@BeforeEach
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test", "processId", new
MockTime());
+ new StreamsMetricsImpl(metrics, "test", "processId",
"applicationId", new MockTime());
context = new InternalMockProcessorContext<>(
TestUtils.tempDirectory(),
Serdes.String(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 70224c8013c..f56ac75a0c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -923,7 +923,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", "processId",
"applicationId", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -956,7 +956,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", "processId",
"applicationId", time);
context = mock(InternalMockProcessorContext.class);
when(context.metrics()).thenReturn(streamsMetrics);
@@ -988,7 +988,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(RecordingLevel.INFO));
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(metrics, "test-application", "processId",
time);
+ new StreamsMetricsImpl(metrics, "test-application", "processId",
"applicationId", time);
final Properties props = StreamsTestUtils.getStreamsConfig();
context = mock(InternalMockProcessorContext.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 69e7d31e31b..5f5495af425 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -63,7 +63,7 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
public void setUp() {
final Metrics metrics = new Metrics();
offset = 0;
- streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
"processId", new MockTime());
+ streamsMetrics = new StreamsMetricsImpl(metrics, "test-client",
"processId", "applicationId", new MockTime());
context = new
MockInternalProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new
TaskId(0, 0), TestUtils.tempDirectory());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 633d14c1e63..0acd2a330da 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -54,7 +54,7 @@ public class TimestampedSegmentTest {
@BeforeEach
public void setUp() {
metricsRecorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", new MockTime()),
new TaskId(0, 0)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
index 75c01cef3c9..62a37ff8d0a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderGaugesTest.java
@@ -202,7 +202,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifySumOfProperties(final String propertyName) throws
Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", new MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -219,7 +219,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithMultipleCaches(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", "applicationId", new MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
@@ -236,7 +236,7 @@ public class RocksDBMetricsRecorderGaugesTest {
private void runAndVerifyBlockCacheMetricsWithSingleCache(final String
propertyName) throws Exception {
final StreamsMetricsImpl streamsMetrics =
- new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
new MockTime());
+ new StreamsMetricsImpl(new Metrics(), "test-client", "processId",
"applicationId", new MockTime());
final RocksDBMetricsRecorder recorder = new
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
recorder.init(streamsMetrics, TASK_ID);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index a0c068b59ee..b0ed10c45fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -194,7 +194,7 @@ public class RocksDBMetricsRecorderTest {
assertThrows(
IllegalStateException.class,
() -> recorder.init(
- new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", new MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", "applicationId", new MockTime()),
TASK_ID1
)
);
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed68c86c490..1361df9a09e 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -90,7 +90,7 @@ public class InternalMockProcessorContext<KOut, VOut>
this(null,
null,
null,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
null,
null,
@@ -108,6 +108,7 @@ public class InternalMockProcessorContext<KOut, VOut>
new Metrics(),
"mock",
"processId",
+ "applicationId",
new MockTime()
),
config,
@@ -141,6 +142,7 @@ public class InternalMockProcessorContext<KOut, VOut>
new Metrics(),
"mock",
"processId",
+ "applicationId",
new MockTime()
),
config,
@@ -158,7 +160,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
config,
null,
null,
@@ -178,7 +180,7 @@ public class InternalMockProcessorContext<KOut, VOut>
null,
serdes.keySerde(),
serdes.valueSerde(),
- new StreamsMetricsImpl(metrics, "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(metrics, "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
null,
@@ -195,7 +197,7 @@ public class InternalMockProcessorContext<KOut, VOut>
stateDir,
keySerde,
valueSerde,
- new StreamsMetricsImpl(new Metrics(), "mock", "processId", new
MockTime()),
+ new StreamsMetricsImpl(new Metrics(), "mock", "processId",
"applicationId", new MockTime()),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
cache,
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 92976875b93..9b812b389fa 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -385,6 +385,7 @@ public class TopologyTestDriver implements Closeable {
metrics,
"test-client",
"processId",
+ "applicationId",
mockWallClockTime
);
TaskMetrics.droppedRecordsSensor(threadId, TASK_ID.toString(),
streamsMetrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 0ffea9b4916..08cf542305c 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -238,6 +238,7 @@ public class MockProcessorContext implements
ProcessorContext, RecordCollector.S
new Metrics(metricConfig),
threadId,
"processId",
+ "applicationId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
index 3ffa85a4503..25f728cd567 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
@@ -256,6 +256,7 @@ public class MockProcessorContext<KForward, VForward>
implements ProcessorContex
new Metrics(metricConfig),
threadId,
"processId",
+ "applicationId",
Time.SYSTEM
);
TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), metrics);
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ebb38dd773a..c760e5dd1d6 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -291,6 +291,7 @@ public class MockProcessorContextTest {
new Metrics(new MetricConfig()),
Thread.currentThread().getName(),
"processId",
+ "applicationId",
Time.SYSTEM
));
when(mockInternalProcessorContext.taskId()).thenReturn(new TaskId(1,
1));