This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 31ab73cf1eb MINOR: avoid unnecessary UnsupportedOperationException 
(#15102)
31ab73cf1eb is described below

commit 31ab73cf1eba2ca5ac041ddf9e011364ccba1787
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Tue Jan 2 07:34:42 2024 -0800

    MINOR: avoid unnecessary UnsupportedOperationException (#15102)
    
    We did no complete KIP-714 with regard to collecting producer clients
    instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing
    an UnsupportedOperationException, we should return an empty map.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  3 ++
 .../streams/processor/internals/StreamThread.java  |  4 ++-
 .../processor/internals/StreamThreadTest.java      | 39 ++++++++++++++++------
 3 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2fdb1e3a8f0..12d794a2376 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1877,6 +1877,9 @@ public class KafkaStreams implements AutoCloseable {
 
     /**
      * Returns the internal clients' assigned {@code client instance ids}.
+     * <p>
+     * Note, if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG} is set to 
{@link StreamsConfig#EXACTLY_ONCE},
+     * the producer client instance ids are not returned yet. This gap will be 
closed in the next release.
      *
      * @return The internal clients' assigned instance ids used for metrics 
collection.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 55d09e8926c..8fc51d78009 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1604,7 +1604,9 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
 
         if 
(processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
-            throw new UnsupportedOperationException("not yet implemented");
+            log.error("EOS v1 enabled: Producer client instance IDs are not 
collected." +
+                " Enable producer logging to retrieve the IDs from the 
producer logs.");
+            producerInstanceIdFuture.complete(Collections.emptyMap());
         } else {
             if (threadProducerInstanceIdFuture.isDone()) {
                 if (threadProducerInstanceIdFuture.isCompletedExceptionally()) 
{
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index df2dacd3c65..0cb70732bcb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3340,24 +3340,37 @@ public class StreamThreadTest {
 
     @Test
     public void shouldGetProducerInstanceId() throws Exception {
-        getProducerInstanceId(false);
+        getProducerInstanceId(false, false);
     }
 
     @Test
-    public void shouldProducerInstanceIdAndInternalTimeout() throws Exception {
-        getProducerInstanceId(true);
+    public void shouldGetProducerInstanceIdWithInternalTimeout() throws 
Exception {
+        getProducerInstanceId(true, false);
     }
 
-    private void getProducerInstanceId(final boolean injectTimeException) 
throws Exception {
+    @Test
+    public void shouldNotGetProducerInstanceIdWithEosV1() throws Exception {
+        getProducerInstanceId(false, true);
+    }
+
+    @SuppressWarnings("deprecation")
+    private void getProducerInstanceId(final boolean injectTimeException,
+                                       final boolean enableEos) throws 
Exception {
         final Uuid producerInstanceId = Uuid.randomUuid();
         final MockProducer<byte[], byte[]> producer = new MockProducer<>();
-        producer.setClientInstanceId(producerInstanceId);
-        if (injectTimeException) {
-            producer.injectTimeoutException(1);
+        if (!enableEos) {
+            producer.setClientInstanceId(producerInstanceId);
+            if (injectTimeException) {
+                producer.injectTimeoutException(1);
+            }
         }
         clientSupplier.prepareProducer(producer);
 
-        thread = createStreamThread("clientId");
+        final Properties properties = configProps(enableEos);
+        if (enableEos) {
+            properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+        }
+        thread = createStreamThread("clientId", new StreamsConfig(properties));
         thread.setState(State.STARTING);
 
         final KafkaFuture<Map<String, KafkaFuture<Uuid>>> 
producerInstanceIdFutures =
@@ -3366,9 +3379,13 @@ public class StreamThreadTest {
         thread.maybeGetClientInstanceIds(); // triggers internal timeout; 
should not crash
         thread.maybeGetClientInstanceIds();
 
-        final KafkaFuture<Uuid> producerFuture = 
producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer");
-        final Uuid producerUuid = producerFuture.get();
-        assertThat(producerUuid, equalTo(producerInstanceId));
+        if (enableEos) {
+            assertThat(producerInstanceIdFutures.get(), equalTo(emptyMap()));
+        } else {
+            final KafkaFuture<Uuid> producerFuture = 
producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer");
+            final Uuid producerUuid = producerFuture.get();
+            assertThat(producerUuid, equalTo(producerInstanceId));
+        }
     }
 
     @Test

Reply via email to