This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4527e54647a KAFKA-14753: Improve kafka producer example (#13354)
4527e54647a is described below

commit 4527e54647a0b8457f7f2b5d804eb65dc4d9d817
Author: Philip Nee <p...@confluent.io>
AuthorDate: Tue Mar 7 16:25:49 2023 -0800

    KAFKA-14753: Improve kafka producer example (#13354)
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../src/main/java/kafka/examples/Producer.java     | 65 +++++++++++++++-------
 1 file changed, 44 insertions(+), 21 deletions(-)

diff --git a/examples/src/main/java/kafka/examples/Producer.java 
b/examples/src/main/java/kafka/examples/Producer.java
index e649a7862c9..e85fa16060e 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -27,7 +27,13 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
+/**
+ * Demo producer that demonstrate two modes of KafkaProducer.
+ * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
+ * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ */
 public class Producer extends Thread {
     private final KafkaProducer<Integer, String> producer;
     private final String topic;
@@ -54,8 +60,8 @@ public class Producer extends Thread {
             props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
         }
         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-
         producer = new KafkaProducer<>(props);
+
         this.topic = topic;
         this.isAsync = isAsync;
         this.numRecords = numRecords;
@@ -70,28 +76,45 @@ public class Producer extends Thread {
     public void run() {
         int messageKey = 0;
         int recordsSent = 0;
-        while (recordsSent < numRecords) {
-            String messageStr = "Message_" + messageKey;
-            long startTime = System.currentTimeMillis();
-            if (isAsync) { // Send asynchronously
-                producer.send(new ProducerRecord<>(topic,
-                    messageKey,
-                    messageStr), new DemoCallBack(startTime, messageKey, 
messageStr));
-            } else { // Send synchronously
-                try {
-                    producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr)).get();
-                    System.out.println("Sent message: (" + messageKey + ", " + 
messageStr + ")");
-                } catch (InterruptedException | ExecutionException e) {
-                    e.printStackTrace();
-                }
+        try {
+            while (recordsSent < numRecords) {
+                final long currentTimeMs = System.currentTimeMillis();
+                produceOnce(messageKey, recordsSent, currentTimeMs);
+                messageKey += 2;
+                recordsSent += 1;
             }
-            messageKey += 2;
-            recordsSent += 1;
+        } catch (Exception e) {
+            System.out.println("Producer encountered exception:" + e);
+        } finally {
+            System.out.println("Producer sent " + numRecords + " records 
successfully");
+            this.producer.close();
+            latch.countDown();
         }
-        System.out.println("Producer sent " + numRecords + " records 
successfully");
-        latch.countDown();
+    }
+
+    private void produceOnce(final int messageKey, final int recordsSent, 
final long currentTimeMs) throws ExecutionException, InterruptedException {
+        String messageStr = "Message_" + messageKey;
+
+        if (isAsync) { // Send asynchronously
+            sendAsync(messageKey, messageStr, currentTimeMs);
+            return;
+        }
+        Future<RecordMetadata> future = send(messageKey, messageStr);
+        future.get();
+        System.out.println("Sent message: (" + messageKey + ", " + messageStr 
+ ")");
+    }
+
+    private void sendAsync(final int messageKey, final String messageStr, 
final long currentTimeMs) {
+        this.producer.send(new ProducerRecord<>(topic,
+                        messageKey,
+                        messageStr),
+                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    }
+
+    private Future<RecordMetadata> send(final int messageKey, final String 
messageStr) {
+        return producer.send(new ProducerRecord<>(topic,
+                messageKey,
+                messageStr));
     }
 }
 

Reply via email to