[
https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Narendra Kumar resolved KAFKA-8380.
-----------------------------------
Resolution: Not A Problem
> We can not create a topic, immediately write to it and then read.
> -----------------------------------------------------------------
>
> Key: KAFKA-8380
> URL: https://issues.apache.org/jira/browse/KAFKA-8380
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.2.0
> Reporter: Darya Merkureva
> Priority: Blocker
>
> We are trying to create a topic, immediately write to it and read.
> For some reason, we read nothing in spite of the fact that we are waiting for
> the completion of KafkaFuture.
> {code:java}
> public class main {
> private static final String TOPIC_NAME = "topic";
> private static final String KEY_NAME = "key";
> public static void main(String[] args) {
> final Properties prodProps = new Properties();
> prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
> prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
> prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> final Producer<String, String> prod = new
> KafkaProducer<>(prodProps);
> final Properties admProps = new Properties();
> admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> final AdminClient adm = KafkaAdminClient.create(admProps);
> final Properties consProps = new Properties();
> consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
> consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
> "1000");
> consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
> "30000");
> consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
> consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
> final Consumer<String,String> cons = new
> KafkaConsumer<>(consProps);
>
> try {
> final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1,
> (short)1);
> val createTopicsResult =
> adm.createTopics(Collections.singleton(newTopic));
> createTopicsResult.values().get(TOPIC_NAME).get();
> } catch (InterruptedException | ExecutionException e) {
> if (!(e.getCause() instanceof TopicExistsException)) {
> throw new RuntimeException(e.getMessage(), e);
> }
> }
>
> final ProducerRecord<String, String> producerRecord =
> new ProducerRecord<>(TOPIC_NAME, KEY_NAME,
> "data");
> prod.send(producerRecord);
> prod.send(producerRecord);
> prod.send(producerRecord);
> prod.send(producerRecord);
> cons.subscribe(Arrays.asList(TOPIC_NAME));
> val records = cons.poll(Duration.ofSeconds(10));
> for(var record: records){
> System.out.println(record.value());
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)