This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 31ab73cf1eb MINOR: avoid unnecessary UnsupportedOperationException (#15102) 31ab73cf1eb is described below commit 31ab73cf1eba2ca5ac041ddf9e011364ccba1787 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Tue Jan 2 07:34:42 2024 -0800 MINOR: avoid unnecessary UnsupportedOperationException (#15102) We did no complete KIP-714 with regard to collecting producer clients instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing an UnsupportedOperationException, we should return an empty map. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../org/apache/kafka/streams/KafkaStreams.java | 3 ++ .../streams/processor/internals/StreamThread.java | 4 ++- .../processor/internals/StreamThreadTest.java | 39 ++++++++++++++++------ 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2fdb1e3a8f0..12d794a2376 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1877,6 +1877,9 @@ public class KafkaStreams implements AutoCloseable { /** * Returns the internal clients' assigned {@code client instance ids}. + * <p> + * Note, if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG} is set to {@link StreamsConfig#EXACTLY_ONCE}, + * the producer client instance ids are not returned yet. This gap will be closed in the next release. * * @return The internal clients' assigned instance ids used for metrics collection. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 55d09e8926c..8fc51d78009 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1604,7 +1604,9 @@ public class StreamThread extends Thread implements ProcessingThread { } if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { - throw new UnsupportedOperationException("not yet implemented"); + log.error("EOS v1 enabled: Producer client instance IDs are not collected." + + " Enable producer logging to retrieve the IDs from the producer logs."); + producerInstanceIdFuture.complete(Collections.emptyMap()); } else { if (threadProducerInstanceIdFuture.isDone()) { if (threadProducerInstanceIdFuture.isCompletedExceptionally()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index df2dacd3c65..0cb70732bcb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3340,24 +3340,37 @@ public class StreamThreadTest { @Test public void shouldGetProducerInstanceId() throws Exception { - getProducerInstanceId(false); + getProducerInstanceId(false, false); } @Test - public void shouldProducerInstanceIdAndInternalTimeout() throws Exception { - getProducerInstanceId(true); + public void shouldGetProducerInstanceIdWithInternalTimeout() throws Exception { + getProducerInstanceId(true, false); } - private void getProducerInstanceId(final boolean injectTimeException) throws Exception { + @Test + public void shouldNotGetProducerInstanceIdWithEosV1() throws Exception { + getProducerInstanceId(false, true); + } + + @SuppressWarnings("deprecation") + private void getProducerInstanceId(final boolean injectTimeException, + final boolean enableEos) throws Exception { final Uuid producerInstanceId = Uuid.randomUuid(); final MockProducer<byte[], byte[]> producer = new MockProducer<>(); - producer.setClientInstanceId(producerInstanceId); - if (injectTimeException) { - producer.injectTimeoutException(1); + if (!enableEos) { + producer.setClientInstanceId(producerInstanceId); + if (injectTimeException) { + producer.injectTimeoutException(1); + } } clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId"); + final Properties properties = configProps(enableEos); + if (enableEos) { + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + } + thread = createStreamThread("clientId", new StreamsConfig(properties)); thread.setState(State.STARTING); final KafkaFuture<Map<String, KafkaFuture<Uuid>>> producerInstanceIdFutures = @@ -3366,9 +3379,13 @@ public class StreamThreadTest { thread.maybeGetClientInstanceIds(); // triggers internal timeout; should not crash thread.maybeGetClientInstanceIds(); - final KafkaFuture<Uuid> producerFuture = producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer"); - final Uuid producerUuid = producerFuture.get(); - assertThat(producerUuid, equalTo(producerInstanceId)); + if (enableEos) { + assertThat(producerInstanceIdFutures.get(), equalTo(emptyMap())); + } else { + final KafkaFuture<Uuid> producerFuture = producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer"); + final Uuid producerUuid = producerFuture.get(); + assertThat(producerUuid, equalTo(producerInstanceId)); + } } @Test