This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4527e54647a KAFKA-14753: Improve kafka producer example (#13354)
4527e54647a is described below
commit 4527e54647a0b8457f7f2b5d804eb65dc4d9d817
Author: Philip Nee <[email protected]>
AuthorDate: Tue Mar 7 16:25:49 2023 -0800
KAFKA-14753: Improve kafka producer example (#13354)
Reviewers: Guozhang Wang <[email protected]>
---
.../src/main/java/kafka/examples/Producer.java | 65 +++++++++++++++-------
1 file changed, 44 insertions(+), 21 deletions(-)
diff --git a/examples/src/main/java/kafka/examples/Producer.java
b/examples/src/main/java/kafka/examples/Producer.java
index e649a7862c9..e85fa16060e 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -27,7 +27,13 @@ import
org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+/**
+ * Demo producer that demonstrate two modes of KafkaProducer.
+ * If the user uses the Async mode: The messages will be printed to stdout
upon successful completion
+ * If the user uses the sync mode (isAsync = false): Each send loop will block
until completion.
+ */
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
@@ -54,8 +60,8 @@ public class Producer extends Thread {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-
producer = new KafkaProducer<>(props);
+
this.topic = topic;
this.isAsync = isAsync;
this.numRecords = numRecords;
@@ -70,28 +76,45 @@ public class Producer extends Thread {
public void run() {
int messageKey = 0;
int recordsSent = 0;
- while (recordsSent < numRecords) {
- String messageStr = "Message_" + messageKey;
- long startTime = System.currentTimeMillis();
- if (isAsync) { // Send asynchronously
- producer.send(new ProducerRecord<>(topic,
- messageKey,
- messageStr), new DemoCallBack(startTime, messageKey,
messageStr));
- } else { // Send synchronously
- try {
- producer.send(new ProducerRecord<>(topic,
- messageKey,
- messageStr)).get();
- System.out.println("Sent message: (" + messageKey + ", " +
messageStr + ")");
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
+ try {
+ while (recordsSent < numRecords) {
+ final long currentTimeMs = System.currentTimeMillis();
+ produceOnce(messageKey, recordsSent, currentTimeMs);
+ messageKey += 2;
+ recordsSent += 1;
}
- messageKey += 2;
- recordsSent += 1;
+ } catch (Exception e) {
+ System.out.println("Producer encountered exception:" + e);
+ } finally {
+ System.out.println("Producer sent " + numRecords + " records
successfully");
+ this.producer.close();
+ latch.countDown();
}
- System.out.println("Producer sent " + numRecords + " records
successfully");
- latch.countDown();
+ }
+
+ private void produceOnce(final int messageKey, final int recordsSent,
final long currentTimeMs) throws ExecutionException, InterruptedException {
+ String messageStr = "Message_" + messageKey;
+
+ if (isAsync) { // Send asynchronously
+ sendAsync(messageKey, messageStr, currentTimeMs);
+ return;
+ }
+ Future<RecordMetadata> future = send(messageKey, messageStr);
+ future.get();
+ System.out.println("Sent message: (" + messageKey + ", " + messageStr
+ ")");
+ }
+
+ private void sendAsync(final int messageKey, final String messageStr,
final long currentTimeMs) {
+ this.producer.send(new ProducerRecord<>(topic,
+ messageKey,
+ messageStr),
+ new DemoCallBack(currentTimeMs, messageKey, messageStr));
+ }
+
+ private Future<RecordMetadata> send(final int messageKey, final String
messageStr) {
+ return producer.send(new ProducerRecord<>(topic,
+ messageKey,
+ messageStr));
}
}