This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d640752304 KAFKA-14708: Use Java thread instead of kafka library for 
example purpose (#13238)
6d640752304 is described below

commit 6d6407523044bf8f122ad12a5344071d525422db
Author: Philip Nee <[email protected]>
AuthorDate: Wed Feb 15 19:28:32 2023 -0800

    KAFKA-14708: Use Java thread instead of kafka library for example purpose 
(#13238)
    
    Remove "kafka.examples.Consumer" dependency on ShutdownableThread. 
"examples" module should be dependent only on public APIs but not to be 
dependent upon server common/internal components.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../src/main/java/kafka/examples/Consumer.java     | 31 +++++++++++-----------
 1 file changed, 16 insertions(+), 15 deletions(-)

diff --git a/examples/src/main/java/kafka/examples/Consumer.java 
b/examples/src/main/java/kafka/examples/Consumer.java
index d7488327ea6..dc07f6da3cb 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -16,7 +16,6 @@
  */
 package kafka.examples;
 
-import kafka.utils.ShutdownableThread;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,7 +27,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-public class Consumer extends ShutdownableThread {
+public class Consumer extends Thread {
     private final KafkaConsumer<Integer, String> consumer;
     private final String topic;
     private final String groupId;
@@ -42,7 +41,7 @@ public class Consumer extends ShutdownableThread {
                     final boolean readCommitted,
                     final int numMessageToConsume,
                     final CountDownLatch latch) {
-        super("KafkaConsumerExample", false);
+        super("KafkaConsumerExample");
         this.groupId = groupId;
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
@@ -70,6 +69,18 @@ public class Consumer extends ShutdownableThread {
     }
 
     @Override
+    public void run() {
+        try {
+            do {
+                doWork();
+            } while (messageRemaining > 0);
+            System.out.println(groupId + " finished reading " + 
numMessageToConsume + " messages");
+        } catch (Exception e) {
+            System.out.println("Unexpected termination, exception thrown:" + 
e);
+        } finally {
+            shutdown();
+        }
+    }
     public void doWork() {
         consumer.subscribe(Collections.singletonList(this.topic));
         ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofSeconds(1));
@@ -77,19 +88,9 @@ public class Consumer extends ShutdownableThread {
             System.out.println(groupId + " received message : from partition " 
+ record.partition() + ", (" + record.key() + ", " + record.value() + ") at 
offset " + record.offset());
         }
         messageRemaining -= records.count();
-        if (messageRemaining <= 0) {
-            System.out.println(groupId + " finished reading " + 
numMessageToConsume + " messages");
-            latch.countDown();
-        }
     }
 
-    @Override
-    public String name() {
-        return null;
-    }
-
-    @Override
-    public boolean isInterruptible() {
-        return false;
+    public void shutdown() {
+        latch.countDown();
     }
 }

Reply via email to