This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd9de50de12 KAFKA-18041: Update key for storing global consumer 
instance id for consistency (#17869)
fd9de50de12 is described below

commit fd9de50de120ec400220e6b960d11f302cd88486
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 20 16:14:03 2024 -0500

    KAFKA-18041: Update key for storing global consumer instance id for 
consistency (#17869)
    
    This PR updates the key for storing the KIP-714 client instance id for the 
global consumer to follow a more consistent pattern of the other embedded Kafka 
Streams consumer clients.
    
    Reviewers: Matthias Sax <[email protected]>
---
 docs/streams/upgrade-guide.html                                     | 6 ++++++
 .../streams/integration/KafkaStreamsTelemetryIntegrationTest.java   | 2 +-
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java    | 2 +-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 8d199263adf..28da54b04b6 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -135,6 +135,12 @@
 
     <h3><a id="streams_api_changes_400" 
href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
 
+    <p>
+        In this release the <code>ClientInstanceIds</code> instance stores the 
global consumer<code>Uuid</code> for the
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid";>KIP-714</a>
+        id with a key of global stream-thread name appended with 
<code>"-global-consumer"</code> where before it was only the global 
stream-thread name.
+    </p>
+
     <p>
         In this release two configs 
<code>default.deserialization.exception.handler</code> and 
<code>default.production.exception.handler</code> are deprecated, as they don't 
have any overwrites, which is described in
         <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig";>KIP-1056</a>
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index bc6e09b6598..18dbd2fa6d8 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -160,7 +160,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
             
             final Uuid mainConsumerInstanceId = 
clientInstanceIds.consumerInstanceIds().entrySet().stream()
                     .filter(entry -> 
!entry.getKey().endsWith("-restore-consumer")
-                            && !entry.getKey().endsWith("GlobalStreamThread"))
+                            && 
!entry.getKey().endsWith("GlobalStreamThread-global-consumer"))
                     .map(Map.Entry::getValue)
                     .findFirst().orElseThrow();
             assertNotNull(adminInstanceId);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 991b073e2fd..79e6af29be7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1939,7 +1939,7 @@ public class KafkaStreams implements AutoCloseable {
             // could be `null` if telemetry is disabled on the client itself
             if (instanceId != null) {
                 clientInstanceIds.addConsumerInstanceId(
-                    globalStreamThread.getName(),
+                    globalStreamThread.getName() + "-global-consumer",
                     instanceId
                 );
             } else {

Reply via email to