[ https://issues.apache.org/jira/browse/KAFKA-3350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke resolved KAFKA-3350. -------------------------------- Resolution: Duplicate > The first published message in an autocreated topic results in an error > ----------------------------------------------------------------------- > > Key: KAFKA-3350 > URL: https://issues.apache.org/jira/browse/KAFKA-3350 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.9.0.1 > Environment: OpenJDK 8, CentOS 7 > Reporter: Jasper Siepkes > Assignee: Jun Rao > > When using "auto.create.topics.enable=true" and publishing the first message > in a topic which is auto created results in the following error and the > message not being published: > {noformat} > 13:42:58,867 WARN [kafka-producer-network-thread | > producer-1][NetworkClient] Error while fetching metadata with correlation id > 0 : > {non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE} > 13:42:58,963 WARN [kafka-producer-network-thread | > producer-1][NetworkClient] Error while fetching metadata with correlation id > 1 : > {non.existing.topic.3b314801-6eb7-4cd2-a372-eaafcdd0db67=LEADER_NOT_AVAILABLE} > {noformat} > Using the default zookeeper and Kafka config and running the following > application reproduces the above behavior: > {code:title=KafkaTopicAutoCreateProducerTest.java|borderStyle=solid} > public static void main(String[] args) { > PropertyConfigurator.configure("log4j.properties"); > Properties producerProperties = new Properties(); > producerProperties.put("bootstrap.servers", "localhost:9092"); > producerProperties.put("acks", "all"); > producerProperties.put("retries", "0"); > producerProperties.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > producerProperties.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > producerProperties.put("block.on.buffer.full", "true"); > KafkaProducer<String, String> producer = null; > try { > producer = new KafkaProducer(producerProperties); > String randomTopic = UUID.randomUUID().toString(); > ProducerRecord<String, String> producerRecord = new > ProducerRecord<>( > "non.existing.topic." + > randomTopic, > "foo test message 1"); > log.debug("About to publish message 1."); > producer.send(producerRecord); > producer.flush(); > Thread.sleep(5000); > producerRecord = new ProducerRecord<>( > "non.existing.topic." + randomTopic, > "foo test message 2"); > log.debug("About to publish message 2."); > producer.send(producerRecord); > producer.flush(); > } catch (Exception e) { > log.warn("An exception occurred while running producer > test.", e); > } finally { > if (producer != null) { > try { > producer.close(); > } catch (Exception e) { > log.info("An exception occurred while > closing the producer.", e); > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)