This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a4b2a086f8639dece77a95b47bbb79840a455150 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Wed Feb 5 16:51:08 2020 -0800 KAFKA-9447: Add new customized EOS model example (#8031) With the improvement of 447, we are now offering developers a better experience on writing their customized EOS apps with group subscription, instead of manual assignments. With the demo, user should be able to get started more quickly on writing their own EOS app, and understand the processing logic much better. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../kafka/clients/admin/DescribeTopicsResult.java | 23 +-- .../kafka/clients/admin/ListTopicsResult.java | 14 +- examples/README | 8 +- examples/bin/exactly-once-demo.sh | 23 +++ .../src/main/java/kafka/examples/Consumer.java | 32 +++- .../examples/ExactlyOnceMessageProcessor.java | 209 +++++++++++++++++++++ .../kafka/examples/KafkaConsumerProducerDemo.java | 14 +- .../java/kafka/examples/KafkaExactlyOnceDemo.java | 185 ++++++++++++++++++ .../src/main/java/kafka/examples/Producer.java | 42 ++++- 9 files changed, 508 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java index 34698b9..7753984 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -51,21 +51,18 @@ public class DescribeTopicsResult { */ public KafkaFuture<Map<String, TopicDescription>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.BaseFunction<Void, Map<String, TopicDescription>>() { - @Override - public Map<String, TopicDescription> apply(Void v) { - Map<String, TopicDescription> descriptions = new HashMap<>(futures.size()); - for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) { - try { - descriptions.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures - // completed successfully. - throw new RuntimeException(e); - } + thenApply(v -> { + Map<String, TopicDescription> descriptions = new HashMap<>(futures.size()); + for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) { + try { + descriptions.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures + // completed successfully. + throw new RuntimeException(e); } - return descriptions; } + return descriptions; }); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java index 4e7e1a2..2154073 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java @@ -48,23 +48,13 @@ public class ListTopicsResult { * Return a future which yields a collection of TopicListing objects. */ public KafkaFuture<Collection<TopicListing>> listings() { - return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Collection<TopicListing>>() { - @Override - public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) { - return namesToDescriptions.values(); - } - }); + return future.thenApply(namesToDescriptions -> namesToDescriptions.values()); } /** * Return a future which yields a collection of topic names. */ public KafkaFuture<Set<String>> names() { - return future.thenApply(new KafkaFuture.BaseFunction<Map<String, TopicListing>, Set<String>>() { - @Override - public Set<String> apply(Map<String, TopicListing> namesToListings) { - return namesToListings.keySet(); - } - }); + return future.thenApply(namesToListings -> namesToListings.keySet()); } } diff --git a/examples/README b/examples/README index f6e3410..2efe71a 100644 --- a/examples/README +++ b/examples/README @@ -6,4 +6,10 @@ To run the demo: 2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh` 3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` 4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` - + 5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh standaloneMode 6 3 50000`, + this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records + 6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh groupMode 6 3 50000`, + this means the same as the standalone demo, except consumers are using subscription mode. + 7. Some notes for exactly once demo: + 7.1. The Kafka server has to be on broker version 2.5 or higher to be able to run group mode. + 7.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments" diff --git a/examples/bin/exactly-once-demo.sh b/examples/bin/exactly-once-demo.sh new file mode 100755 index 0000000..e9faa42 --- /dev/null +++ b/examples/bin/exactly-once-demo.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0)/../.. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi + +exec $base_dir/bin/kafka-run-class.sh kafka.examples.KafkaExactlyOnceDemo $@ diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 26d6e23..19cb67c 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -25,24 +25,45 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.CountDownLatch; public class Consumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; + private final String groupId; + private final int numMessageToConsume; + private int messageRemaining; + private final CountDownLatch latch; - public Consumer(String topic) { + public Consumer(final String topic, + final String groupId, + final boolean readCommitted, + final int numMessageToConsume, + final CountDownLatch latch) { super("KafkaConsumerExample", false); + this.groupId = groupId; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + if (readCommitted) { + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + } + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(props); this.topic = topic; + this.numMessageToConsume = numMessageToConsume; + this.messageRemaining = numMessageToConsume; + this.latch = latch; + } + + KafkaConsumer<Integer, String> get() { + return consumer; } @Override @@ -50,7 +71,12 @@ public class Consumer extends ShutdownableThread { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<Integer, String> record : records) { - System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); + System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); + } + messageRemaining -= records.count(); + if (messageRemaining <= 0) { + System.out.println(groupId + " finished reading " + numMessageToConsume + " messages"); + latch.countDown(); } } diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java new file mode 100644 index 0000000..53685f3 --- /dev/null +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.examples; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.ProducerFencedException; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A demo class for how to write a customized EOS app. It takes a consume-process-produce loop. + * Important configurations and APIs are commented. + */ +public class ExactlyOnceMessageProcessor extends Thread { + + private static final boolean READ_COMMITTED = true; + + private final String mode; + private final String inputTopic; + private final String outputTopic; + private final String consumerGroupId; + private final int numPartitions; + private final int numInstances; + private final int instanceIdx; + private final String transactionalId; + + private final KafkaProducer<Integer, String> producer; + private final KafkaConsumer<Integer, String> consumer; + + private final CountDownLatch latch; + + public ExactlyOnceMessageProcessor(final String mode, + final String inputTopic, + final String outputTopic, + final int numPartitions, + final int numInstances, + final int instanceIdx, + final CountDownLatch latch) { + this.mode = mode; + this.inputTopic = inputTopic; + this.outputTopic = outputTopic; + this.consumerGroupId = "Eos-consumer"; + this.numPartitions = numPartitions; + this.numInstances = numInstances; + this.instanceIdx = instanceIdx; + this.transactionalId = "Processor-" + instanceIdx; + // A unique transactional.id must be provided in order to properly use EOS. + producer = new Producer(outputTopic, true, transactionalId, true, -1, null).get(); + // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. + consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get(); + this.latch = latch; + } + + @Override + public void run() { + // Init transactions call should always happen first in order to clear zombie transactions from previous generation. + producer.initTransactions(); + + final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); + + // Under group mode, topic based subscription is sufficient as EOS apps are safe to cooperate transactionally after 2.5. + // Under standalone mode, user needs to manually assign the topic partitions and make sure the assignment is unique + // across the consumer group instances. + if (this.mode.equals("groupMode")) { + consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + printWithTxnId("Received partition assignment after rebalancing: " + partitions); + messageRemaining.set(messagesRemaining(consumer)); + } + }); + } else { + // Do a range assignment of topic partitions. + List<TopicPartition> topicPartitions = new ArrayList<>(); + int rangeSize = numPartitions / numInstances; + int startPartition = rangeSize * instanceIdx; + int endPartition = Math.min(numPartitions - 1, startPartition + rangeSize - 1); + for (int partition = startPartition; partition <= endPartition; partition++) { + topicPartitions.add(new TopicPartition(inputTopic, partition)); + } + + consumer.assign(topicPartitions); + printWithTxnId("Manually assign partitions: " + topicPartitions); + } + + int messageProcessed = 0; + boolean abortPreviousTransaction = false; + while (messageRemaining.get() > 0) { + ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); + if (records.count() > 0) { + try { + // Abort previous transaction if instructed. + if (abortPreviousTransaction) { + producer.abortTransaction(); + // The consumer fetch position also needs to be reset. + resetToLastCommittedPositions(consumer); + abortPreviousTransaction = false; + } + // Begin a new transaction session. + producer.beginTransaction(); + for (ConsumerRecord<Integer, String> record : records) { + // Process the record and send to downstream. + ProducerRecord<Integer, String> customizedRecord = transform(record); + producer.send(customizedRecord); + } + Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + // Checkpoint the progress by sending offsets to group coordinator broker. + // Under group mode, we must apply consumer group metadata for proper fencing. + if (this.mode.equals("groupMode")) { + producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); + } else { + producer.sendOffsetsToTransaction(positions, consumerGroupId); + } + + // Finish the transaction. All sent records should be visible for consumption now. + producer.commitTransaction(); + messageProcessed += records.count(); + } catch (CommitFailedException e) { + // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. + abortPreviousTransaction = true; + } catch (ProducerFencedException | FencedInstanceIdException e) { + throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); + } + } + messageRemaining.set(messagesRemaining(consumer)); + printWithTxnId("Message remaining: " + messageRemaining); + } + + printWithTxnId("Finished processing " + messageProcessed + " records"); + latch.countDown(); + } + + private void printWithTxnId(final String message) { + System.out.println(transactionalId + ": " + message); + } + + private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) { + printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")"); + return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value()); + } + + private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) { + final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment())); + // If we couldn't detect any end offset, that means we are still not able to fetch offsets. + if (fullEndOffsets.isEmpty()) { + return Long.MAX_VALUE; + } + + return consumer.assignment().stream().mapToLong(partition -> { + long currentPosition = consumer.position(partition); + printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets); + if (fullEndOffsets.containsKey(partition)) { + return fullEndOffsets.get(partition) - currentPosition; + } + return 0; + }).sum(); + } + + private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) { + final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) + consumer.seek(tp, offsetAndMetadata.offset()); + else + consumer.seekToBeginning(Collections.singleton(tp)); + }); + } +} diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index f42ed6f..561732b 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -16,14 +16,20 @@ */ package kafka.examples; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class KafkaConsumerProducerDemo { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); - Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync); + CountDownLatch latch = new CountDownLatch(2); + Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, latch); producerThread.start(); - Consumer consumerThread = new Consumer(KafkaProperties.TOPIC); + Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch); consumerThread.start(); - + latch.await(5, TimeUnit.MINUTES); + consumerThread.shutdown(); + System.out.println("All finished!"); } } diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java new file mode 100644 index 0000000..d418eba --- /dev/null +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.examples; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * This exactly once demo driver takes 4 arguments: + * - mode: whether to run as standalone app, or a group + * - partition: number of partitions for input/output topic + * - instances: number of instances + * - records: number of records + * An example argument list would be `groupMode 6 3 50000` + * + * The driver could be decomposed as following stages: + * + * 1. Cleanup any topic whose name conflicts with input and output topic, so that we have a clean-start. + * + * 2. Set up a producer in a separate thread to pre-populate a set of records with even number keys into + * the input topic. The driver will block for the record generation to finish, so the producer + * must be in synchronous sending mode. + * + * 3. Set up transactional instances in separate threads which does a consume-process-produce loop, + * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will + * drain all the records from either given partitions or auto assigned partitions by actively + * comparing log end offset with committed offset. Each record will be processed exactly once + * as dividing the key by 2, and extend the value message. The driver will block for all the record + * processing to finish. The transformed record shall be written to the output topic, with + * transactional guarantee. + * + * 4. Set up a read committed consumer in a separate thread to verify we have all records within + * the output topic, while the message ordering on partition level is maintained. + * The driver will block for the consumption of all committed records. + * + * From this demo, you could see that all the records from pre-population are processed exactly once, + * in either standalone mode or group mode, with strong partition level ordering guarantee. + * + * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5 + * in order to run group mode, otherwise the app could throw + * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. + */ +public class KafkaExactlyOnceDemo { + + private static final String INPUT_TOPIC = "input-topic"; + private static final String OUTPUT_TOPIC = "output-topic"; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + if (args.length != 4) { + throw new IllegalArgumentException("Should accept 4 parameters: [mode], " + + "[number of partitions], [number of instances], [number of records]"); + } + + String mode = args[0]; + int numPartitions = Integer.valueOf(args[1]); + int numInstances = Integer.valueOf(args[2]); + int numRecords = Integer.valueOf(args[3]); + + /* Stage 1: topic cleanup and recreation */ + recreateTopics(numPartitions); + + CountDownLatch prePopulateLatch = new CountDownLatch(1); + + /* Stage 2: pre-populate records */ + Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, prePopulateLatch); + producerThread.start(); + + prePopulateLatch.await(5, TimeUnit.MINUTES); + + CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); + + /* Stage 3: transactionally process all messages */ + for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { + ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(mode, + INPUT_TOPIC, OUTPUT_TOPIC, numPartitions, + numInstances, instanceIdx, transactionalCopyLatch); + messageProcessor.start(); + } + + transactionalCopyLatch.await(5, TimeUnit.MINUTES); + + CountDownLatch consumeLatch = new CountDownLatch(1); + + /* Stage 4: consume all processed messages to verify exactly once */ + Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch); + consumerThread.start(); + + consumeLatch.await(5, TimeUnit.MINUTES); + consumerThread.shutdown(); + System.out.println("All finished!"); + } + + private static void recreateTopics(final int numPartitions) + throws ExecutionException, InterruptedException { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); + + Admin adminClient = Admin.create(props); + + List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC); + + deleteTopic(adminClient, topicsToDelete); + + // Check topic existence in a retry loop + while (true) { + System.out.println("Making sure the topics are deleted successfully: " + topicsToDelete); + + Set<String> listedTopics = adminClient.listTopics().names().get(); + System.out.println("Current list of topics: " + listedTopics); + + boolean hasTopicInfo = false; + for (String listedTopic : listedTopics) { + if (topicsToDelete.contains(listedTopic)) { + hasTopicInfo = true; + break; + } + } + if (!hasTopicInfo) { + break; + } + Thread.sleep(1000); + } + + // Create topics in a retry loop + while (true) { + final short replicationFactor = 1; + final List<NewTopic> newTopics = Arrays.asList( + new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor), + new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor)); + try { + adminClient.createTopics(newTopics).all().get(); + System.out.println("Created new topics: " + newTopics); + break; + } catch (ExecutionException e) { + if (!(e.getCause() instanceof TopicExistsException)) { + throw e; + } + System.out.println("Metadata of the old topics are not cleared yet..."); + + deleteTopic(adminClient, topicsToDelete); + + Thread.sleep(1000); + } + } + } + + private static void deleteTopic(final Admin adminClient, final List<String> topicsToDelete) + throws InterruptedException, ExecutionException { + try { + adminClient.deleteTopics(topicsToDelete).all().get(); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { + throw e; + } + System.out.println("Encountered exception during topic deletion: " + e.getCause()); + } + System.out.println("Deleted old topics: " + topicsToDelete); + } +} diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index b6998c5..3805dd3 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -25,45 +25,69 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; + private int numRecords; + private final CountDownLatch latch; - public Producer(String topic, Boolean isAsync) { + public Producer(final String topic, + final Boolean isAsync, + final String transactionalId, + final boolean enableIdempotency, + final int numRecords, + final CountDownLatch latch) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + if (transactionalId != null) { + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); + producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; + this.numRecords = numRecords; + this.latch = latch; + } + + KafkaProducer<Integer, String> get() { + return producer; } + @Override public void run() { - int messageNo = 1; - while (true) { - String messageStr = "Message_" + messageNo; + int messageKey = 0; + int recordsSent = 0; + while (recordsSent < numRecords) { + String messageStr = "Message_" + messageKey; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, - messageNo, - messageStr), new DemoCallBack(startTime, messageNo, messageStr)); + messageKey, + messageStr), new DemoCallBack(startTime, messageKey, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, - messageNo, + messageKey, messageStr)).get(); - System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); + System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } - ++messageNo; + messageKey += 2; + recordsSent += 1; } + System.out.println("Producer sent " + numRecords + " records successfully"); + latch.countDown(); } }