fvaleri commented on code in PR #13517:
URL: https://github.com/apache/kafka/pull/13517#discussion_r1191435518


##########
examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java:
##########
@@ -16,29 +16,59 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.common.errors.TimeoutException;
-
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example can be decomposed into the following stages:
+ *
+ * 1. Clean any topics left from previous runs.
+ * 2. Create a producer thread to send a set of records to topic1.
+ * 3. Create a consumer thread to fetch all previously sent records from 
topic1.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in 
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify 
options - Save console output to file` to
+ * record all the log output together.
+ */
 public class KafkaConsumerProducerDemo {
-    public static void main(String[] args) throws InterruptedException {
-        boolean isAsync = args.length == 0 || 
!args[0].trim().equalsIgnoreCase("sync");
-        CountDownLatch latch = new CountDownLatch(2);
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 
10000, -1, latch);
-        producerThread.start();
-
-        Consumer consumerThread = new Consumer(
-            "consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", 
Optional.empty(), false, 10000, latch);
-        consumerThread.start();
-
-        if (!latch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
demo producer and consumer to finish");
-        }
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    public static final String TOPIC_NAME = "my-topic";
+    public static final String GROUP_NAME = "my-group";
+
+    public static void main(String[] args) {
+        try {
+            if (args.length == 0) {
+                Utils.printHelp("This example takes 2 parameters (i.e. 10000 
sync):%n" +
+                    "- records: total number of records to send (required)%n" +
+                    "- mode: pass 'sync' to send records synchronously 
(optional)");
+                return;
+            }
 
-        consumerThread.shutdown();
-        System.out.println("All finished!");
+            int numRecords = Integer.parseInt(args[0]);
+            boolean isAsync = args.length == 1 || 
!args[1].trim().equalsIgnoreCase("sync");
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, -1, TOPIC_NAME);
+            CountDownLatch latch = new CountDownLatch(2);
+
+            // stage 2: produce records to topic1
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, 
false, numRecords, -1, latch);
+            producerThread.start();
+
+            // stage 3: consume records from topic1
+            Consumer consumerThread = new Consumer(
+                "consumer", BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, 
Optional.empty(), false, numRecords, latch);
+            consumerThread.start();
+
+            if (!latch.await(5, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 5 minutes waiting for 
termination");
+                //producerThread.shutdown();
+                consumerThread.shutdown();

Review Comment:
   Rebase issue, fixed.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the 
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so 
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records 
with even number keys into
- *    the input topic. The driver will block for the record generation to 
finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a 
consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). 
Each EOS instance will
- *    drain all the records from either given partitions or auto assigned 
partitions by actively
- *    comparing log end offset with committed offset. Each record will be 
processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will 
block for all the record
- *    processing to finish. The transformed record shall be written to the 
output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have 
all records within
- *    the output topic, while the message ordering on partition level is 
maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even 
number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer 
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one 
executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain 
all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end 
offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and 
extending the value record.
+ *    The demo will block until all records are processed and written to the 
output topic.

Review Comment:
   Yep.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the 
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so 
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records 
with even number keys into
- *    the input topic. The driver will block for the record generation to 
finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a 
consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). 
Each EOS instance will
- *    drain all the records from either given partitions or auto assigned 
partitions by actively
- *    comparing log end offset with committed offset. Each record will be 
processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will 
block for all the record
- *    processing to finish. The transformed record shall be written to the 
output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have 
all records within
- *    the output topic, while the message ordering on partition level is 
maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even 
number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer 
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one 
executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain 
all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end 
offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and 
extending the value record.
+ *    The demo will block until all records are processed and written to the 
output topic.
+ * 4. Create a read_committed consumer  thread to verify we have all records 
in the output topic,
+ *    and record ordering at the partition level is maintained.
+ *    The demo will block for the consumption of all committed records.
  *
- * From this demo, you could see that all the records from pre-population are 
processed exactly once,
- * with strong partition level ordering guarantee.

Review Comment:
   Yep. Added in line 35.



##########
examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java:
##########
@@ -16,182 +16,90 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 3 arguments:
- *   - partition: number of partitions for input/output topic
- *   - instances: number of instances
- *   - records: number of records
- * An example argument list would be `6 3 50000`.
- *
- * If you are using IntelliJ IDEA, the above arguments should be put in the 
configuration's `Program Arguments`.
- * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
- * output to file` to record all the log output together.
- *
- * The driver could be decomposed as following stages:
- *
- * 1. Cleanup any topic whose name conflicts with input and output topic, so 
that we have a clean-start.
- *
- * 2. Set up a producer in a separate thread to pre-populate a set of records 
with even number keys into
- *    the input topic. The driver will block for the record generation to 
finish, so the producer
- *    must be in synchronous sending mode.
- *
- * 3. Set up transactional instances in separate threads which does a 
consume-process-produce loop,
- *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). 
Each EOS instance will
- *    drain all the records from either given partitions or auto assigned 
partitions by actively
- *    comparing log end offset with committed offset. Each record will be 
processed exactly once
- *    as dividing the key by 2, and extend the value message. The driver will 
block for all the record
- *    processing to finish. The transformed record shall be written to the 
output topic, with
- *    transactional guarantee.
+ * This example can be decomposed into the following stages:
  *
- * 4. Set up a read committed consumer in a separate thread to verify we have 
all records within
- *    the output topic, while the message ordering on partition level is 
maintained.
- *    The driver will block for the consumption of all committed records.
+ * 1. Clean any topics left from previous runs.
+ * 2. Set up a producer thread to pre-populate a set of records with even 
number keys into the input topic.
+ *    The demo will block for the record generation to finish, so the producer 
is synchronous.
+ * 3. Set up the transactional instances in separate threads, each one 
executing a read-process-write loop
+ *    (See {@link ExactlyOnceMessageProcessor}). Each EOS instance will drain 
all records from either given
+ *    partitions or auto assigned partitions by actively comparing log end 
offset with committed offset.
+ *    Each record will be processed exactly once, dividing the key by 2 and 
extending the value record.
+ *    The demo will block until all records are processed and written to the 
output topic.
+ * 4. Create a read_committed consumer  thread to verify we have all records 
in the output topic,
+ *    and record ordering at the partition level is maintained.
+ *    The demo will block for the consumption of all committed records.
  *
- * From this demo, you could see that all the records from pre-population are 
processed exactly once,
- * with strong partition level ordering guarantee.
- *
- * Note: please start the kafka broker and zookeeper in local first. The 
broker version must be >= 2.5
- * in order to run, otherwise the app could throw
+ * Broker version must be >= 2.5.0 in order to run, otherwise the example will 
throw
  * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ *
+ * If you are using IntelliJ IDEA, the above arguments should be put in 
`Modify Run Configuration - Program Arguments`.
+ * You can also set an output log file in `Modify Run Configuration - Modify 
options - Save console output to file` to
+ * record all the log output together.
  */
 public class KafkaExactlyOnceDemo {
-
+    public static final String BOOTSTRAP_SERVERS = "localhost:9092";
     private static final String INPUT_TOPIC = "input-topic";
     private static final String OUTPUT_TOPIC = "output-topic";
+    public static final String GROUP_NAME = "check-group";
 
-    public static void main(String[] args) throws InterruptedException, 
ExecutionException {
-        if (args.length != 3) {
-            throw new IllegalArgumentException("Should accept 3 parameters: " +
-                "[number of partitions], [number of instances], [number of 
records]");
-        }
-
-        int numPartitions = Integer.parseInt(args[0]);
-        int numInstances = Integer.parseInt(args[1]);
-        int numRecords = Integer.parseInt(args[2]);
-
-        /* Stage 1: topic cleanup and recreation */
-        recreateTopics(numPartitions);
-
-        CountDownLatch prePopulateLatch = new CountDownLatch(1);
-
-        /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(
-            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, 
-1, prePopulateLatch);
-        producerThread.start();
-
-        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
data pre-population");
-        }
-
-        CountDownLatch transactionalCopyLatch = new 
CountDownLatch(numInstances);
-
-        /* Stage 3: transactionally process all messages */
-        for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
-            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(
-                INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, 
transactionalCopyLatch);
-            messageProcessor.start();
-        }
-
-        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
transactionally message copy");
-        }
-
-        CountDownLatch consumeLatch = new CountDownLatch(1);
-
-        /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(
-            "consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", 
Optional.empty(), true, numRecords, consumeLatch);
-        consumerThread.start();
-
-        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
-            throw new TimeoutException("Timeout after 5 minutes waiting for 
output data consumption");
-        }
-
-        consumerThread.shutdown();
-        System.out.println("All finished!");
-    }
-
-    private static void recreateTopics(final int numPartitions)
-        throws ExecutionException, InterruptedException {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-            KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT);
-
-        Admin adminClient = Admin.create(props);
-
-        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
-
-        deleteTopic(adminClient, topicsToDelete);
-
-        // Check topic existence in a retry loop
-        while (true) {
-            System.out.println("Making sure the topics are deleted 
successfully: " + topicsToDelete);
-
-            Set<String> listedTopics = adminClient.listTopics().names().get();
-            System.out.println("Current list of topics: " + listedTopics);
-
-            boolean hasTopicInfo = false;
-            for (String listedTopic : listedTopics) {
-                if (topicsToDelete.contains(listedTopic)) {
-                    hasTopicInfo = true;
-                    break;
-                }
-            }
-            if (!hasTopicInfo) {
-                break;
+    public static void main(String[] args) {
+        try {
+            if (args.length != 3) {
+                Utils.printHelp("This example takes 3 parameters (i.e. 6 3 
10000):%n" +
+                    "- partition: number of partitions for input and output 
topics (required)%n" +
+                    "- instances: number of application instances 
(required)%n" +
+                    "- records: total number of records (required)");
+                return;
             }
-            Thread.sleep(1000);
-        }
-
-        // Create topics in a retry loop
-        while (true) {
-            final short replicationFactor = 1;
-            final List<NewTopic> newTopics = Arrays.asList(
-                new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
-                new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
-            try {
-                adminClient.createTopics(newTopics).all().get();
-                System.out.println("Created new topics: " + newTopics);
-                break;
-            } catch (ExecutionException e) {
-                if (!(e.getCause() instanceof TopicExistsException)) {
-                    throw e;
-                }
-                System.out.println("Metadata of the old topics are not cleared 
yet...");
 
-                deleteTopic(adminClient, topicsToDelete);
+            int numPartitions = Integer.parseInt(args[0]);
+            int numInstances = Integer.parseInt(args[1]);
+            int numRecords = Integer.parseInt(args[2]);
+
+            // stage 1: clean any topics left from previous runs
+            Utils.recreateTopics(BOOTSTRAP_SERVERS, numPartitions, 
INPUT_TOPIC, OUTPUT_TOPIC);
+
+            // stage 2: send demo records to the input-topic
+            CountDownLatch producerLatch = new CountDownLatch(1);
+            Producer producerThread = new Producer(
+                "producer", BOOTSTRAP_SERVERS, INPUT_TOPIC, false, null, true, 
numRecords, -1, producerLatch);
+            producerThread.start();
+            if (!producerLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for data 
load");
+                producerThread.shutdown();
+                return;
+            }
 
-                Thread.sleep(1000);
+            // stage 3: read from input-topic, process once and write to the 
output-topic
+            CountDownLatch processorsLatch = new CountDownLatch(numInstances);
+            for (int instanceIdx = 0; instanceIdx < numInstances; 
instanceIdx++) {
+                ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(
+                    INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, processorsLatch);
+                messageProcessor.start();
+            }
+            if (!processorsLatch.await(2, TimeUnit.MINUTES)) {
+                Utils.printErr("Timeout after 2 minutes waiting for record 
copy");
+                //processors.forEach(ExactlyOnceMessageProcessor::shutdown);

Review Comment:
   There was no shutdown before the new processor merge. Added now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to