This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 908dfa30d77 MINOR: Update clientInstanceIds from EOS_V2 refactor 
(#17664)
908dfa30d77 is described below

commit 908dfa30d77b3368d2a228d0f1bc3803189e7ec1
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Nov 2 10:37:46 2024 -0400

    MINOR: Update clientInstanceIds from EOS_V2 refactor (#17664)
    
    Updates KafkaStreams.clientInstanceIds method to correctly populate the 
client-id -> clientInstanceId map that was altered in a previous refactoring.
    
    Added a test that confirms ClientInstanceIds is correctly storing consumer 
and producer instance ids
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 17 ++++++++++---
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 29 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5cfb464a2a5..cb6cbbfb1f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -92,6 +92,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -1929,10 +1930,18 @@ public class KafkaStreams implements AutoCloseable {
 
             // could be `null` if telemetry is disabled on the consumer itself
             if (instanceId != null) {
-                clientInstanceIds.addConsumerInstanceId(
-                    clientFuture.getKey(),
-                    instanceId
-                );
+                final String clientFutureKey = clientFuture.getKey();
+                if 
(clientFutureKey.toLowerCase(Locale.getDefault()).endsWith("-producer")) {
+                    clientInstanceIds.addProducerInstanceId(
+                            clientFutureKey,
+                            instanceId
+                    );
+                } else {
+                    clientInstanceIds.addConsumerInstanceId(
+                            clientFutureKey,
+                            instanceId
+                    );
+                }
             } else {
                 log.debug(String.format("Telemetry is disabled for %s.", 
clientFuture.getKey()));
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index efe2f26e295..ab35530abd1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -1668,6 +1669,34 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldReturnProducerAndConsumerInstanceIds() {
+        prepareStreams();
+        prepareStreamThread(streamThreadOne, 1);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final Uuid mainConsumerInstanceId = Uuid.randomUuid();
+        final Uuid producerInstanceId = Uuid.randomUuid();
+        final KafkaFutureImpl<Uuid> consumerFuture = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Uuid> producerFuture = new KafkaFutureImpl<>();
+        consumerFuture.complete(mainConsumerInstanceId);
+        producerFuture.complete(producerInstanceId);
+        final Uuid adminInstanceId = Uuid.randomUuid();
+        adminClient.setClientInstanceId(adminInstanceId);
+        
+        final Map<String, KafkaFuture<Uuid>> expectedClientIds = 
Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture);
+        
when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds);
+
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final ClientInstanceIds clientInstanceIds = 
streams.clientInstanceIds(Duration.ZERO);
+            assertThat(clientInstanceIds.consumerInstanceIds().size(), 
equalTo(1));
+            
assertThat(clientInstanceIds.consumerInstanceIds().get("main-consumer"), 
equalTo(mainConsumerInstanceId));
+            assertThat(clientInstanceIds.producerInstanceIds().size(),  
equalTo(1));
+            
assertThat(clientInstanceIds.producerInstanceIds().get("some-thread-producer"), 
equalTo(producerInstanceId));
+            assertThat(clientInstanceIds.adminInstanceId(), 
equalTo(adminInstanceId));
+        }
+    }
+
     @Test
     public void 
shouldThrowTimeoutExceptionWhenAnyClientFutureDoesNotComplete() {
         prepareStreams();

Reply via email to