This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 38e86a6b155e1d0dea2d04eca4f8a4c9684ec30c Author: Lianet Magrans <[email protected]> AuthorDate: Thu Jan 22 15:25:09 2026 -0500 MINOR: fix producer doc and add test for partitionsFor timeout behaviour (#21347) Follow-up from previous PR where it was wrongly added that send would timeout if topic/partitions does not exist (the future returned will timeout out) Also adding integration test to validate the behaviour on partitionsFor (docs were accurate for that one) Reviewers: Matthias J. Sax <[email protected]> --- .../org/apache/kafka/clients/producer/KafkaProducer.java | 4 ++-- .../integration/kafka/api/PlaintextProducerSendTest.scala | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7f6edfb37ea..70fa5ade966 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -933,12 +933,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body * to parallelize processing. * - * @param record The record to send + * @param record The record to send. If the topic or the partition specified in it cannot be found + * in metadata within {@code max.block.ms}, the returned future will time out when retrieved. * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or * when send is invoked after producer has been closed. - * @throws TimeoutException if the topic or the partition specified in the record cannot be found in metadata within {@code max.block.ms} * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index dc8b9423304..e52b2092094 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -170,6 +170,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertEquals("Partition 10 of topic topic with partition count 4 is not present in metadata after 500 ms.", exception.getCause.getMessage) } + /** + * Test error message received when partitionsFor fails waiting on metadata for a topic that does not exist. + * No need to run this for both rebalance protocols. + */ + @ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}") + @MethodSource(Array("protocolAndAutoCreateTopicProviders")) + def testPartitionsForTimeoutErrorWhenTopicDoesNotExist(groupProtocol: String, autoCreateTopicsEnabled: String): Unit = { + val producer = createProducer(maxBlockMs = 500) + + val exception = assertThrows(classOf[TimeoutException], () => producer.partitionsFor("unexisting-topic")) + assertEquals("Topic unexisting-topic not present in metadata after 500 ms.", exception.getMessage) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("timestampConfigProvider")) def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
