Michal Turek created KAFKA-3450:
-----------------------------------

             Summary: Producer blocks on send to topic that doesn't exist if 
auto create is disabled
                 Key: KAFKA-3450
                 URL: https://issues.apache.org/jira/browse/KAFKA-3450
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.9.0.1
            Reporter: Michal Turek
            Assignee: Jun Rao
            Priority: Minor


{{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if the 
destination topic doesn't exist and if their automatic creation is disabled. 
Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is logged 
every 100 ms in a loop until the 60 seconds timeout expires, but the operation 
is not recoverable.

Preconditions

- Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
- Kafka 0.9.0.1 clients.


Example minimalist code

https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java

{noformat}
/**
 * Test of sending to a topic that does not exist while automatic creation of 
topics is disabled in Kafka (auto.create.topics.enable=false).
 */
public class NoSuchTopicTest {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(NoSuchTopicTest.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
        properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
NoSuchTopicTest.class.getSimpleName());
        properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); // 
Default is 60 seconds

        try (Producer<String, String> producer = new 
KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
            LOGGER.info("Sending message");
            producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", "key", 
"value"), (metadata, exception) -> {
                if (exception != null) {
                    LOGGER.error("Send failed: {}", exception.toString());
                } else {
                    LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), 
metadata.partition(), metadata.offset());
                }
            });

            LOGGER.info("Sending message");
            producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
"key", "value"), (metadata, exception) -> {
                if (exception != null) {
                    LOGGER.error("Send failed: {}", exception.toString());
                } else {
                    LOGGER.info("Send successful: {}-{}/{}", metadata.topic(), 
metadata.partition(), metadata.offset());
                }
            });
        }
    }
}
{noformat}

Related output

{noformat}
2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
Sending message (NoSuchTopicTest.java:26)
2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 0 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 1 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 2 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 3 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 4 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 5 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 6 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.534 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 7 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.635 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 8 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.736 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 9 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send 
failed: org.apache.kafka.common.errors.TimeoutException: Failed to update 
metadata after 35 ms. (NoSuchTopicTest.java:29)
2016-03-23 12:44:38.773 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
Sending message (NoSuchTopicTest.java:35)
2016-03-23 12:44:38.837 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 10 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.938 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 11 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.039 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 12 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.140 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 13 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.242 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 14 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.345 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 15 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.447 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 16 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.549 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 17 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.651 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 18 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.752 WARN  o.a.kafka.clients.NetworkClient     
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 19 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: Send 
failed: org.apache.kafka.common.errors.TimeoutException: Failed to update 
metadata after 21 ms. (NoSuchTopicTest.java:38)
2016-03-23 12:44:39.774 INFO  o.a.k.c.producer.KafkaProducer      [main]: 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
(KafkaProducer.java:613)
{noformat}

Known workaround

- Configure {{max.block.ms = 0}} in producer to prevent blocking and return 
from send() immediately. But be careful, I'm not sure if is it safe and can't 
cause something even worse ;-)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to