fvaleri commented on code in PR #13517:
URL: https://github.com/apache/kafka/pull/13517#discussion_r1191435518
##########
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java:
##########
@@ -16,29 +16,59 @@
*/
package kafka.examples;
-import org.apache.kafka.common.errors.TimeoutException;
-
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from
topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify
options - Save console output to file` to
+ * record all the log output together.
+ */
public class KafkaConsumerProducerDemo {
- public static void main(String[] args) throws InterruptedException {
- boolean isAsync = args.length == 0 ||
!args[0].trim().equalsIgnoreCase("sync");
- CountDownLatch latch = new CountDownLatch(2);
- Producer producerThread = new Producer(
- "producer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false,
10000, -1, latch);
- producerThread.start();
-
- Consumer consumerThread = new Consumer(
- "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer",
Optional.empty(), false, 10000, latch);
- consumerThread.start();
-
- if (!latch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for
demo producer and consumer to finish");
- }
+ public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ public static final String TOPIC_NAME = "my-topic";
+ public static final String GROUP_NAME = "my-group";
+
+ public static void main(String[] args) {
+ try {
+ if (args.length == 0) {
+ Utils.printHelp("This example takes 2 parameters (i.e. 10000
sync):%n" +
+ "- records: total number of records to send (required)%n" +
+ "- mode: pass 'sync' to send records synchronously
(optional)");
+ return;
+ }
- consumerThread.shutdown();
- System.out.println("All finished!");
+ int numRecords = Integer.parseInt(args[0]);
+ boolean isAsync = args.length == 1 ||
!args[1].trim().equalsIgnoreCase("sync");
+
+ // stage 1: clean any topics left from previous runs
+ Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+ CountDownLatch latch = new CountDownLatch(2);
+
+ // stage 2: produce records to topic1
+ Producer producerThread = new Producer(
+ "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null,
false, numRecords, -1, latch);
+ producerThread.start();
+
+ // stage 3: consume records from topic1
+ Consumer consumerThread = new Consumer(
+ "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME,
Optional.empty(), false, numRecords, latch);
+ consumerThread.start();
+
+ if (!latch.await(5, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 5 minutes waiting for
termination");
+ //producerThread.shutdown();
+ consumerThread.shutdown();
Review Comment:
Rebase issue, fixed.
##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
*/
package kafka.examples;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
- * This exactly once demo driver takes 3 arguments:
- * - partition: number of partitions for input/output topic
- * - instances: number of instances
- * - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records
with even number keys into
- * the input topic. The driver will block for the record generation to
finish, so the producer
- * must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a
consume-process-produce loop,
- * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}).
Each EOS instance will
- * drain all the records from either given partitions or auto assigned
partitions by actively
- * comparing log end offset with committed offset. Each record will be
processed exactly once
- * as dividing the key by 2, and extend the value message. The driver will
block for all the record
- * processing to finish. The transformed record shall be written to the
output topic, with
- * transactional guarantee.
+ * This example can be decomposed into the following stages:
*
- * 4. Set up a read committed consumer in a separate thread to verify we have
all records within
- * the output topic, while the message ordering on partition level is
maintained.
- * The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even
number keys into the input topic.
+ * The demo will block for the record generation to finish, so the producer
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one
executing a read-process-write loop
+ * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain
all records from either given
+ * partitions or auto assigned partitions by actively comparing log end
offset with committed offset.
+ * Each record will be processed exactly once, dividing the key by 2 and
extending the value record.
+ * The demo will block until all records are processed and written to the
output topic.
Review Comment:
Yep.
##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
*/
package kafka.examples;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
- * This exactly once demo driver takes 3 arguments:
- * - partition: number of partitions for input/output topic
- * - instances: number of instances
- * - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records
with even number keys into
- * the input topic. The driver will block for the record generation to
finish, so the producer
- * must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a
consume-process-produce loop,
- * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}).
Each EOS instance will
- * drain all the records from either given partitions or auto assigned
partitions by actively
- * comparing log end offset with committed offset. Each record will be
processed exactly once
- * as dividing the key by 2, and extend the value message. The driver will
block for all the record
- * processing to finish. The transformed record shall be written to the
output topic, with
- * transactional guarantee.
+ * This example can be decomposed into the following stages:
*
- * 4. Set up a read committed consumer in a separate thread to verify we have
all records within
- * the output topic, while the message ordering on partition level is
maintained.
- * The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even
number keys into the input topic.
+ * The demo will block for the record generation to finish, so the producer
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one
executing a read-process-write loop
+ * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain
all records from either given
+ * partitions or auto assigned partitions by actively comparing log end
offset with committed offset.
+ * Each record will be processed exactly once, dividing the key by 2 and
extending the value record.
+ * The demo will block until all records are processed and written to the
output topic.
+ * 4. Create a read_committed consumer thread to verify we have all records
in the output topic,
+ * and record ordering at the partition level is maintained.
+ * The demo will block for the consumption of all committed records.
*
- * From this demo, you could see that all the records from pre-population are
processed exactly once,
- * with strong partition level ordering guarantee.
Review Comment:
Yep. Added in line 35.
##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
*/
package kafka.examples;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
- * This exactly once demo driver takes 3 arguments:
- * - partition: number of partitions for input/output topic
- * - instances: number of instances
- * - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records
with even number keys into
- * the input topic. The driver will block for the record generation to
finish, so the producer
- * must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a
consume-process-produce loop,
- * tailing data from input topic (See {@link ExactlyOnceMessageProcessor}).
Each EOS instance will
- * drain all the records from either given partitions or auto assigned
partitions by actively
- * comparing log end offset with committed offset. Each record will be
processed exactly once
- * as dividing the key by 2, and extend the value message. The driver will
block for all the record
- * processing to finish. The transformed record shall be written to the
output topic, with
- * transactional guarantee.
+ * This example can be decomposed into the following stages:
*
- * 4. Set up a read committed consumer in a separate thread to verify we have
all records within
- * the output topic, while the message ordering on partition level is
maintained.
- * The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even
number keys into the input topic.
+ * The demo will block for the record generation to finish, so the producer
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one
executing a read-process-write loop
+ * (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain
all records from either given
+ * partitions or auto assigned partitions by actively comparing log end
offset with committed offset.
+ * Each record will be processed exactly once, dividing the key by 2 and
extending the value record.
+ * The demo will block until all records are processed and written to the
output topic.
+ * 4. Create a read_committed consumer thread to verify we have all records
in the output topic,
+ * and record ordering at the partition level is maintained.
+ * The demo will block for the consumption of all committed records.
*
- * From this demo, you could see that all the records from pre-population are
processed exactly once,
- * with strong partition level ordering guarantee.
- *
- * Note: please start the kafka broker and zookeeper in local first. The
broker version must be >= 2.5
- * in order to run, otherwise the app could throw
+ * Broker version must be >= 2.5.0 in order to run, otherwise the example will
throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify
options - Save console output to file` to
+ * record all the log output together.
*/
public class KafkaExactlyOnceDemo {
-
+ public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
+ public static final String GROUP_NAME = "check-group";
- public static void main(String[] args) throws InterruptedException,
ExecutionException {
- if (args.length != 3) {
- throw new IllegalArgumentException("Should accept 3 parameters: " +
- "[number of partitions], [number of instances], [number of
records]");
- }
-
- int numPartitions = Integer.parseInt(args[0]);
- int numInstances = Integer.parseInt(args[1]);
- int numRecords = Integer.parseInt(args[2]);
-
- /* Stage 1: topic cleanup and recreation */
- recreateTopics(numPartitions);
-
- CountDownLatch prePopulateLatch = new CountDownLatch(1);
-
- /* Stage 2: pre-populate records */
- Producer producerThread = new Producer(
- "producer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords,
-1, prePopulateLatch);
- producerThread.start();
-
- if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for
data pre-population");
- }
-
- CountDownLatch transactionalCopyLatch = new
CountDownLatch(numInstances);
-
- /* Stage 3: transactionally process all messages */
- for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
- ExactlyOnceMessageProcessor messageProcessor = new
ExactlyOnceMessageProcessor(
- INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx,
transactionalCopyLatch);
- messageProcessor.start();
- }
-
- if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for
transactionally message copy");
- }
-
- CountDownLatch consumeLatch = new CountDownLatch(1);
-
- /* Stage 4: consume all processed messages to verify exactly once */
- Consumer consumerThread = new Consumer(
- "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer",
Optional.empty(), true, numRecords, consumeLatch);
- consumerThread.start();
-
- if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
- throw new TimeoutException("Timeout after 5 minutes waiting for
output data consumption");
- }
-
- consumerThread.shutdown();
- System.out.println("All finished!");
- }
-
- private static void recreateTopics(final int numPartitions)
- throws ExecutionException, InterruptedException {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT);
-
- Admin adminClient = Admin.create(props);
-
- List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
-
- deleteTopic(adminClient, topicsToDelete);
-
- // Check topic existence in a retry loop
- while (true) {
- System.out.println("Making sure the topics are deleted
successfully: " + topicsToDelete);
-
- Set<String> listedTopics = adminClient.listTopics().names().get();
- System.out.println("Current list of topics: " + listedTopics);
-
- boolean hasTopicInfo = false;
- for (String listedTopic : listedTopics) {
- if (topicsToDelete.contains(listedTopic)) {
- hasTopicInfo = true;
- break;
- }
- }
- if (!hasTopicInfo) {
- break;
+ public static void main(String[] args) {
+ try {
+ if (args.length != 3) {
+ Utils.printHelp("This example takes 3 parameters (i.e. 6 3
10000):%n" +
+ "- partition: number of partitions for input and output
topics (required)%n" +
+ "- instances: number of application instances
(required)%n" +
+ "- records: total number of records (required)");
+ return;
}
- Thread.sleep(1000);
- }
-
- // Create topics in a retry loop
- while (true) {
- final short replicationFactor = 1;
- final List<NewTopic> newTopics = Arrays.asList(
- new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
- new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
- try {
- adminClient.createTopics(newTopics).all().get();
- System.out.println("Created new topics: " + newTopics);
- break;
- } catch (ExecutionException e) {
- if (!(e.getCause() instanceof TopicExistsException)) {
- throw e;
- }
- System.out.println("Metadata of the old topics are not cleared
yet...");
- deleteTopic(adminClient, topicsToDelete);
+ int numPartitions = Integer.parseInt(args[0]);
+ int numInstances = Integer.parseInt(args[1]);
+ int numRecords = Integer.parseInt(args[2]);
+
+ // stage 1: clean any topics left from previous runs
+ Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions,
INPUT_TOPIC, OUTPUT_TOPIC);
+
+ // stage 2: send demo records to the input-topic
+ CountDownLatch producerLatch = new CountDownLatch(1);
+ Producer producerThread = new Producer(
+ "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true,
numRecords, -1, producerLatch);
+ producerThread.start();
+ if (!producerLatch.await(2, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 2 minutes waiting for data
load");
+ producerThread.shutdown();
+ return;
+ }
- Thread.sleep(1000);
+ // stage 3: read from input-topic, process once and write to the
output-topic
+ CountDownLatch processorsLatch = new CountDownLatch(numInstances);
+ for (int instanceIdx = 0; instanceIdx < numInstances;
instanceIdx++) {
+ ExactlyOnceMessageProcessor messageProcessor = new
ExactlyOnceMessageProcessor(
+ INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, processorsLatch);
+ messageProcessor.start();
+ }
+ if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
+ Utils.printErr("Timeout after 2 minutes waiting for record
copy");
+ //processors.forEach(ExactlyOnceMessageProcessor::shutdown);
Review Comment:
There was no shutdown before the new processor merge. Added now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]