This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d640752304 KAFKA-14708: Use Java thread instead of kafka library for
example purpose (#13238)
6d640752304 is described below
commit 6d6407523044bf8f122ad12a5344071d525422db
Author: Philip Nee <[email protected]>
AuthorDate: Wed Feb 15 19:28:32 2023 -0800
KAFKA-14708: Use Java thread instead of kafka library for example purpose
(#13238)
Remove "kafka.examples.Consumer" dependency on ShutdownableThread.
"examples" module should be dependent only on public APIs but not to be
dependent upon server common/internal components.
Reviewers: Luke Chen <[email protected]>
---
.../src/main/java/kafka/examples/Consumer.java | 31 +++++++++++-----------
1 file changed, 16 insertions(+), 15 deletions(-)
diff --git a/examples/src/main/java/kafka/examples/Consumer.java
b/examples/src/main/java/kafka/examples/Consumer.java
index d7488327ea6..dc07f6da3cb 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -16,7 +16,6 @@
*/
package kafka.examples;
-import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,7 +27,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-public class Consumer extends ShutdownableThread {
+public class Consumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final String groupId;
@@ -42,7 +41,7 @@ public class Consumer extends ShutdownableThread {
final boolean readCommitted,
final int numMessageToConsume,
final CountDownLatch latch) {
- super("KafkaConsumerExample", false);
+ super("KafkaConsumerExample");
this.groupId = groupId;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
@@ -70,6 +69,18 @@ public class Consumer extends ShutdownableThread {
}
@Override
+ public void run() {
+ try {
+ do {
+ doWork();
+ } while (messageRemaining > 0);
+ System.out.println(groupId + " finished reading " +
numMessageToConsume + " messages");
+ } catch (Exception e) {
+ System.out.println("Unexpected termination, exception thrown:" +
e);
+ } finally {
+ shutdown();
+ }
+ }
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofSeconds(1));
@@ -77,19 +88,9 @@ public class Consumer extends ShutdownableThread {
System.out.println(groupId + " received message : from partition "
+ record.partition() + ", (" + record.key() + ", " + record.value() + ") at
offset " + record.offset());
}
messageRemaining -= records.count();
- if (messageRemaining <= 0) {
- System.out.println(groupId + " finished reading " +
numMessageToConsume + " messages");
- latch.countDown();
- }
}
- @Override
- public String name() {
- return null;
- }
-
- @Override
- public boolean isInterruptible() {
- return false;
+ public void shutdown() {
+ latch.countDown();
}
}