This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 38e86a6b155e1d0dea2d04eca4f8a4c9684ec30c
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jan 22 15:25:09 2026 -0500

    MINOR: fix producer doc and add test for partitionsFor timeout behaviour 
(#21347)
    
    Follow-up from previous PR where it was wrongly added that send would
    timeout if topic/partitions does not exist (the future returned will
    timeout out)
    
    Also adding integration test to validate the behaviour on partitionsFor
    (docs were accurate for that one)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/clients/producer/KafkaProducer.java    |  4 ++--
 .../integration/kafka/api/PlaintextProducerSendTest.scala   | 13 +++++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7f6edfb37ea..70fa5ade966 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -933,12 +933,12 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * expensive callbacks it is recommended to use your own {@link 
java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
      *
-     * @param record   The record to send
+     * @param record   The record to send. If the topic or the partition 
specified in it cannot be found
+     *                 in metadata within {@code max.block.ms}, the returned 
future will time out when retrieved.
      * @param callback A user-supplied callback to execute when the record has 
been acknowledged by the server (null
      *                 indicates no callback)
      * @throws IllegalStateException  if a transactional.id has been 
configured and no transaction has been started, or
      *                                when send is invoked after producer has 
been closed.
-     * @throws TimeoutException       if the topic or the partition specified 
in the record cannot be found in metadata within {@code max.block.ms}
      * @throws InterruptException     If the thread is interrupted while 
blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
      * @throws KafkaException         If a Kafka related error occurs that 
does not belong to the public API exceptions.
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index dc8b9423304..e52b2092094 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -170,6 +170,19 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
     assertEquals("Partition 10 of topic topic with partition count 4 is not 
present in metadata after 500 ms.", exception.getCause.getMessage)
   }
 
+  /**
+   * Test error message received when partitionsFor fails waiting on metadata 
for a topic that does not exist.
+   * No need to run this for both rebalance protocols.
+   */
+  @ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}")
+  @MethodSource(Array("protocolAndAutoCreateTopicProviders"))
+  def testPartitionsForTimeoutErrorWhenTopicDoesNotExist(groupProtocol: 
String, autoCreateTopicsEnabled: String): Unit = {
+    val producer = createProducer(maxBlockMs = 500)
+
+    val exception = assertThrows(classOf[TimeoutException], () => 
producer.partitionsFor("unexisting-topic"))
+    assertEquals("Topic unexisting-topic not present in metadata after 500 
ms.", exception.getMessage)
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("timestampConfigProvider"))
   def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String, 
messageTimeStampConfig: String, recordTimestamp: Long): Unit = {

Reply via email to