This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 908dfa30d77 MINOR: Update clientInstanceIds from EOS_V2 refactor
(#17664)
908dfa30d77 is described below
commit 908dfa30d77b3368d2a228d0f1bc3803189e7ec1
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Nov 2 10:37:46 2024 -0400
MINOR: Update clientInstanceIds from EOS_V2 refactor (#17664)
Updates KafkaStreams.clientInstanceIds method to correctly populate the
client-id -> clientInstanceId map that was altered in a previous refactoring.
Added a test that confirms ClientInstanceIds is correctly storing consumer
and producer instance ids
Reviewers: Matthias Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 17 ++++++++++---
.../org/apache/kafka/streams/KafkaStreamsTest.java | 29 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5cfb464a2a5..cb6cbbfb1f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -92,6 +92,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -1929,10 +1930,18 @@ public class KafkaStreams implements AutoCloseable {
// could be `null` if telemetry is disabled on the consumer itself
if (instanceId != null) {
- clientInstanceIds.addConsumerInstanceId(
- clientFuture.getKey(),
- instanceId
- );
+ final String clientFutureKey = clientFuture.getKey();
+ if
(clientFutureKey.toLowerCase(Locale.getDefault()).endsWith("-producer")) {
+ clientInstanceIds.addProducerInstanceId(
+ clientFutureKey,
+ instanceId
+ );
+ } else {
+ clientInstanceIds.addConsumerInstanceId(
+ clientFutureKey,
+ instanceId
+ );
+ }
} else {
log.debug(String.format("Telemetry is disabled for %s.",
clientFuture.getKey()));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index efe2f26e295..ab35530abd1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
@@ -1668,6 +1669,34 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void shouldReturnProducerAndConsumerInstanceIds() {
+ prepareStreams();
+ prepareStreamThread(streamThreadOne, 1);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ final Uuid mainConsumerInstanceId = Uuid.randomUuid();
+ final Uuid producerInstanceId = Uuid.randomUuid();
+ final KafkaFutureImpl<Uuid> consumerFuture = new KafkaFutureImpl<>();
+ final KafkaFutureImpl<Uuid> producerFuture = new KafkaFutureImpl<>();
+ consumerFuture.complete(mainConsumerInstanceId);
+ producerFuture.complete(producerInstanceId);
+ final Uuid adminInstanceId = Uuid.randomUuid();
+ adminClient.setClientInstanceId(adminInstanceId);
+
+ final Map<String, KafkaFuture<Uuid>> expectedClientIds =
Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture);
+
when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds);
+
+ try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ final ClientInstanceIds clientInstanceIds =
streams.clientInstanceIds(Duration.ZERO);
+ assertThat(clientInstanceIds.consumerInstanceIds().size(),
equalTo(1));
+
assertThat(clientInstanceIds.consumerInstanceIds().get("main-consumer"),
equalTo(mainConsumerInstanceId));
+ assertThat(clientInstanceIds.producerInstanceIds().size(),
equalTo(1));
+
assertThat(clientInstanceIds.producerInstanceIds().get("some-thread-producer"),
equalTo(producerInstanceId));
+ assertThat(clientInstanceIds.adminInstanceId(),
equalTo(adminInstanceId));
+ }
+ }
+
@Test
public void
shouldThrowTimeoutExceptionWhenAnyClientFutureDoesNotComplete() {
prepareStreams();