This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd9de50de12 KAFKA-18041: Update key for storing global consumer
instance id for consistency (#17869)
fd9de50de12 is described below
commit fd9de50de120ec400220e6b960d11f302cd88486
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 20 16:14:03 2024 -0500
KAFKA-18041: Update key for storing global consumer instance id for
consistency (#17869)
This PR updates the key for storing the KIP-714 client instance id for the
global consumer to follow a more consistent pattern of the other embedded Kafka
Streams consumer clients.
Reviewers: Matthias Sax <[email protected]>
---
docs/streams/upgrade-guide.html | 6 ++++++
.../streams/integration/KafkaStreamsTelemetryIntegrationTest.java | 2 +-
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +-
3 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 8d199263adf..28da54b04b6 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -135,6 +135,12 @@
<h3><a id="streams_api_changes_400"
href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
+ <p>
+ In this release the <code>ClientInstanceIds</code> instance stores the
global consumer<code>Uuid</code> for the
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid">KIP-714</a>
+ id with a key of global stream-thread name appended with
<code>"-global-consumer"</code> where before it was only the global
stream-thread name.
+ </p>
+
<p>
In this release two configs
<code>default.deserialization.exception.handler</code> and
<code>default.production.exception.handler</code> are deprecated, as they don't
have any overwrites, which is described in
<a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig">KIP-1056</a>
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index bc6e09b6598..18dbd2fa6d8 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -160,7 +160,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
final Uuid mainConsumerInstanceId =
clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry ->
!entry.getKey().endsWith("-restore-consumer")
- && !entry.getKey().endsWith("GlobalStreamThread"))
+ &&
!entry.getKey().endsWith("GlobalStreamThread-global-consumer"))
.map(Map.Entry::getValue)
.findFirst().orElseThrow();
assertNotNull(adminInstanceId);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 991b073e2fd..79e6af29be7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1939,7 +1939,7 @@ public class KafkaStreams implements AutoCloseable {
// could be `null` if telemetry is disabled on the client itself
if (instanceId != null) {
clientInstanceIds.addConsumerInstanceId(
- globalStreamThread.getName(),
+ globalStreamThread.getName() + "-global-consumer",
instanceId
);
} else {