[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018675#comment-16018675
 ] 

Raymond Conn commented on KAFKA-3450:
-------------------------------------

Also it looks like if the {{max.block.ms}} is greater than 5 minutes the caller 
will be forced to wait the max block time even if the topic does get created. 
After 5 minutes the topic will get removed from the topics map inside of the 
[Metadata | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218]
 class.
It will never get added back since it looks like the only [add | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528]
 is out side the [loop | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L561]
 that the producer is "blocking" on. Basically after 5 minutes all metadata 
requests are for no topics.


{code}
2017-05-20 20:20:05 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=topic-test2) to node 2
2017-05-20 20:20:05 TRACE NetworkClient - Sending {topics=[topic-test2]} to 
node 2.
2017-05-20 20:20:12 DEBUG Metadata - Removing unused topic topic-test2 from the 
metadata list, expiryMs 1495324449201 now 1495326005464
2017-05-20 20:20:22 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 5
2017-05-20 20:20:22 TRACE NetworkClient - Sending {topics=[]} to node 5.
2017-05-20 20:20:35 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 2
2017-05-20 20:20:35 TRACE NetworkClient - Sending {topics=[]} to node 2.
2017-05-20 20:20:48 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 1
2017-05-20 20:20:48 TRACE NetworkClient - Sending {topics=[]} to node 1.
{code}


> Producer blocks on send to topic that doesn't exist if auto create is disabled
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-3450
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3450
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>            Reporter: Michal Turek
>            Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
>     private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
>     public static void main(String[] args) {
>         Properties properties = new Properties();
>         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>         properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
>         properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
>         try (Producer<String, String> producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
>             LOGGER.info("Sending message");
>             producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
>                 if (exception != null) {
>                     LOGGER.error("Send failed: {}", exception.toString());
>                 } else {
>                     LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
>                 }
>             });
>             LOGGER.info("Sending message");
>             producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
>                 if (exception != null) {
>                     LOGGER.error("Send failed: {}", exception.toString());
>                 } else {
>                     LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
>                 }
>             });
>         }
>     }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 5 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 6 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.534 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 7 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.635 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 8 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.736 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 9 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to 
> update metadata after 35 ms. (NoSuchTopicTest.java:29)
> 2016-03-23 12:44:38.773 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:35)
> 2016-03-23 12:44:38.837 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 10 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.938 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 11 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.039 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 12 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.140 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 13 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.242 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 14 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.345 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 15 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.447 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 16 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.549 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 17 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.651 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 18 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.752 WARN  o.a.kafka.clients.NetworkClient     
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 19 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, 
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to 
> update metadata after 21 ms. (NoSuchTopicTest.java:38)
> 2016-03-23 12:44:39.774 INFO  o.a.k.c.producer.KafkaProducer      [main]: 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
> (KafkaProducer.java:613)
> {noformat}
> Known workaround
> - Configure {{max.block.ms = 0}} in producer to prevent blocking and return 
> from send() immediately. But be careful, I'm not sure if is it safe and can't 
> cause something even worse ;-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to