This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new e2f9f08 MINOR: Improve EOS example exception handling (#8052)
e2f9f08 is described below
commit e2f9f08d1c735e89e7de46e5af0ebfbe771de473
Author: Boyang Chen <[email protected]>
AuthorDate: Thu Feb 20 09:59:09 2020 -0800
MINOR: Improve EOS example exception handling (#8052)
The current EOS example mixes fatal and non-fatal error handling. This
patch fixes this problem and simplifies the example.
Reviewers: Jason Gustafson <[email protected]>
---
examples/README | 12 +-
.../src/main/java/kafka/examples/Consumer.java | 3 +
.../examples/ExactlyOnceMessageProcessor.java | 121 ++++++++-------------
.../kafka/examples/KafkaConsumerProducerDemo.java | 3 +-
.../java/kafka/examples/KafkaExactlyOnceDemo.java | 32 +++---
.../main/java/kafka/examples/KafkaProperties.java | 5 -
6 files changed, 75 insertions(+), 101 deletions(-)
diff --git a/examples/README b/examples/README
index 2efe71a..bff6cd3 100644
--- a/examples/README
+++ b/examples/README
@@ -6,10 +6,8 @@ To run the demo:
2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
3. For unlimited sync-producer-consumer run, `run
bin/java-producer-consumer-demo.sh sync`
4. For unlimited async-producer-consumer run, `run
bin/java-producer-consumer-demo.sh`
- 5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh
standaloneMode 6 3 50000`,
- this means we are starting 3 EOS instances with 6 topic partitions and
50000 pre-populated records
- 6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh
groupMode 6 3 50000`,
- this means the same as the standalone demo, except consumers are using
subscription mode.
- 7. Some notes for exactly once demo:
- 7.1. The Kafka server has to be on broker version 2.5 or higher to be
able to run group mode.
- 7.2. You could also use Intellij to run the example directly by
configuring parameters as "Program arguments"
+ 5. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
+ this means we are starting 3 EOS instances with 6 topic partitions and
50000 pre-populated records.
+ 6. Some notes for exactly once demo:
+ 6.1. The Kafka server has to be on broker version 2.5 or higher.
+ 6.2. You could also use Intellij to run the example directly by
configuring parameters as "Program arguments"
diff --git a/examples/src/main/java/kafka/examples/Consumer.java
b/examples/src/main/java/kafka/examples/Consumer.java
index 19cb67c..d748832 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -37,6 +38,7 @@ public class Consumer extends ShutdownableThread {
public Consumer(final String topic,
final String groupId,
+ final Optional<String> instanceId,
final boolean readCommitted,
final int numMessageToConsume,
final CountDownLatch latch) {
@@ -45,6 +47,7 @@ public class Consumer extends ShutdownableThread {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ instanceId.ifPresent(id ->
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
diff --git
a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
index 482e442..8f31b19 100644
--- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -16,7 +16,6 @@
*/
package kafka.examples;
-import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -34,8 +33,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,42 +46,32 @@ public class ExactlyOnceMessageProcessor extends Thread {
private static final boolean READ_COMMITTED = true;
- private final String mode;
private final String inputTopic;
private final String outputTopic;
- private final String consumerGroupId;
- private final int numPartitions;
- private final int numInstances;
- private final int instanceIdx;
private final String transactionalId;
+ private final String groupInstanceId;
private final KafkaProducer<Integer, String> producer;
private final KafkaConsumer<Integer, String> consumer;
private final CountDownLatch latch;
- public ExactlyOnceMessageProcessor(final String mode,
- final String inputTopic,
+ public ExactlyOnceMessageProcessor(final String inputTopic,
final String outputTopic,
- final int numPartitions,
- final int numInstances,
final int instanceIdx,
final CountDownLatch latch) {
- this.mode = mode;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
- this.consumerGroupId = "Eos-consumer";
- this.numPartitions = numPartitions;
- this.numInstances = numInstances;
- this.instanceIdx = instanceIdx;
this.transactionalId = "Processor-" + instanceIdx;
- // If we are using the group mode, it is recommended to have a
relatively short txn timeout
- // in order to clear pending offsets faster.
- final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000
: -1;
+ // It is recommended to have a relatively short txn timeout in order
to clear pending offsets faster.
+ final int transactionTimeoutMs = 10000;
// A unique transactional.id must be provided in order to properly use
EOS.
producer = new Producer(outputTopic, true, transactionalId, true, -1,
transactionTimeoutMs, null).get();
// Consumer must be in read_committed mode, which means it won't be
able to read uncommitted data.
- consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED,
-1, null).get();
+ // Consumer could optionally configure groupInstanceId to avoid
unnecessary rebalances.
+ this.groupInstanceId = "Txn-consumer-" + instanceIdx;
+ consumer = new Consumer(inputTopic, "Eos-consumer",
+ Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get();
this.latch = latch;
}
@@ -93,49 +82,24 @@ public class ExactlyOnceMessageProcessor extends Thread {
final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
- // Under group mode, topic based subscription is sufficient as EOS
apps are safe to cooperate transactionally after 2.5.
- // Under standalone mode, user needs to manually assign the topic
partitions and make sure the assignment is unique
- // across the consumer group instances.
- if (this.mode.equals("groupMode")) {
- consumer.subscribe(Collections.singleton(inputTopic), new
ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
- printWithTxnId("Revoked partition assignment to kick-off
rebalancing: " + partitions);
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- printWithTxnId("Received partition assignment after
rebalancing: " + partitions);
- messageRemaining.set(messagesRemaining(consumer));
- }
- });
- } else {
- // Do a range assignment of topic partitions.
- List<TopicPartition> topicPartitions = new ArrayList<>();
- int rangeSize = numPartitions / numInstances;
- int startPartition = rangeSize * instanceIdx;
- int endPartition = Math.min(numPartitions - 1, startPartition +
rangeSize - 1);
- for (int partition = startPartition; partition <= endPartition;
partition++) {
- topicPartitions.add(new TopicPartition(inputTopic, partition));
+ consumer.subscribe(Collections.singleton(inputTopic), new
ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ printWithTxnId("Revoked partition assignment to kick-off
rebalancing: " + partitions);
}
- consumer.assign(topicPartitions);
- printWithTxnId("Manually assign partitions: " + topicPartitions);
- }
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ printWithTxnId("Received partition assignment after
rebalancing: " + partitions);
+ messageRemaining.set(messagesRemaining(consumer));
+ }
+ });
int messageProcessed = 0;
- boolean abortPreviousTransaction = false;
while (messageRemaining.get() > 0) {
- ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofMillis(200));
- if (records.count() > 0) {
- try {
- // Abort previous transaction if instructed.
- if (abortPreviousTransaction) {
- producer.abortTransaction();
- // The consumer fetch position also needs to be reset.
- resetToLastCommittedPositions(consumer);
- abortPreviousTransaction = false;
- }
+ try {
+ ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofMillis(200));
+ if (records.count() > 0) {
// Begin a new transaction session.
producer.beginTransaction();
for (ConsumerRecord<Integer, String> record : records) {
@@ -143,28 +107,31 @@ public class ExactlyOnceMessageProcessor extends Thread {
ProducerRecord<Integer, String> customizedRecord =
transform(record);
producer.send(customizedRecord);
}
- Map<TopicPartition, OffsetAndMetadata> positions = new
HashMap<>();
- for (TopicPartition topicPartition :
consumer.assignment()) {
- positions.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition), null));
- }
+
+ Map<TopicPartition, OffsetAndMetadata> offsets =
consumerOffsets();
+
// Checkpoint the progress by sending offsets to group
coordinator broker.
- // Under group mode, we must apply consumer group metadata
for proper fencing.
- if (this.mode.equals("groupMode")) {
- producer.sendOffsetsToTransaction(positions,
consumer.groupMetadata());
- } else {
- producer.sendOffsetsToTransaction(positions,
consumerGroupId);
- }
+ // Note that this API is only available for broker >= 2.5.
+ producer.sendOffsetsToTransaction(offsets,
consumer.groupMetadata());
// Finish the transaction. All sent records should be
visible for consumption now.
producer.commitTransaction();
messageProcessed += records.count();
- } catch (CommitFailedException e) {
- // In case of a retriable exception, suggest aborting the
ongoing transaction for correctness.
- abortPreviousTransaction = true;
- } catch (ProducerFencedException | FencedInstanceIdException
e) {
- throw new KafkaException("Encountered fatal error during
processing: " + e.getMessage());
}
+ } catch (ProducerFencedException e) {
+ throw new KafkaException(String.format("The transactional.id
%s has been claimed by another process", transactionalId));
+ } catch (FencedInstanceIdException e) {
+ throw new KafkaException(String.format("The group.instance.id
%s has been claimed by another process", groupInstanceId));
+ } catch (KafkaException e) {
+ // If we have not been fenced, try to abort the transaction
and continue. This will raise immediately
+ // if the producer has hit a fatal error.
+ producer.abortTransaction();
+
+ // The consumer fetch position needs to be restored to the
committed offset
+ // before the transaction started.
+ resetToLastCommittedPositions(consumer);
}
+
messageRemaining.set(messagesRemaining(consumer));
printWithTxnId("Message remaining: " + messageRemaining);
}
@@ -173,6 +140,14 @@ public class ExactlyOnceMessageProcessor extends Thread {
latch.countDown();
}
+ private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ for (TopicPartition topicPartition : consumer.assignment()) {
+ offsets.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition), null));
+ }
+ return offsets;
+ }
+
private void printWithTxnId(final String message) {
System.out.println(transactionalId + ": " + message);
}
diff --git
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 8a29402..9fc911a 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -18,6 +18,7 @@ package kafka.examples;
import org.apache.kafka.common.errors.TimeoutException;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -28,7 +29,7 @@ public class KafkaConsumerProducerDemo {
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync,
null, false, 10000, -1, latch);
producerThread.start();
- Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,
"DemoConsumer", false, 10000, latch);
+ Consumer consumerThread = new Consumer(KafkaProperties.TOPIC,
"DemoConsumer", Optional.empty(), false, 10000, latch);
consumerThread.start();
if (!latch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 6da159c..50a1ad1 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -32,12 +33,15 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
- * This exactly once demo driver takes 4 arguments:
- * - mode: whether to run as standalone app, or a group
+ * This exactly once demo driver takes 3 arguments:
* - partition: number of partitions for input/output topic
* - instances: number of instances
* - records: number of records
- * An example argument list would be `groupMode 6 3 50000`
+ * An example argument list would be `6 3 50000`.
+ *
+ * If you are using Intellij, the above arguments should be put in the
configuration's `Program Arguments`.
+ * Also recommended to set an output log file by `Edit Configuration -> Logs
-> Save console
+ * output to file` to record all the log output together.
*
* The driver could be decomposed as following stages:
*
@@ -60,10 +64,10 @@ import java.util.concurrent.TimeUnit;
* The driver will block for the consumption of all committed records.
*
* From this demo, you could see that all the records from pre-population are
processed exactly once,
- * in either standalone mode or group mode, with strong partition level
ordering guarantee.
+ * with strong partition level ordering guarantee.
*
* Note: please start the kafka broker and zookeeper in local first. The
broker version must be >= 2.5
- * in order to run group mode, otherwise the app could throw
+ * in order to run, otherwise the app could throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*/
public class KafkaExactlyOnceDemo {
@@ -72,15 +76,14 @@ public class KafkaExactlyOnceDemo {
private static final String OUTPUT_TOPIC = "output-topic";
public static void main(String[] args) throws InterruptedException,
ExecutionException {
- if (args.length != 4) {
- throw new IllegalArgumentException("Should accept 4 parameters:
[mode], " +
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Should accept 3 parameters: " +
"[number of partitions], [number of instances], [number of
records]");
}
- String mode = args[0];
- int numPartitions = Integer.parseInt(args[1]);
- int numInstances = Integer.parseInt(args[2]);
- int numRecords = Integer.parseInt(args[3]);
+ int numPartitions = Integer.parseInt(args[0]);
+ int numInstances = Integer.parseInt(args[1]);
+ int numRecords = Integer.parseInt(args[2]);
/* Stage 1: topic cleanup and recreation */
recreateTopics(numPartitions);
@@ -99,9 +102,8 @@ public class KafkaExactlyOnceDemo {
/* Stage 3: transactionally process all messages */
for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
- ExactlyOnceMessageProcessor messageProcessor = new
ExactlyOnceMessageProcessor(mode,
- INPUT_TOPIC, OUTPUT_TOPIC, numPartitions,
- numInstances, instanceIdx, transactionalCopyLatch);
+ ExactlyOnceMessageProcessor messageProcessor = new
ExactlyOnceMessageProcessor(
+ INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx,
transactionalCopyLatch);
messageProcessor.start();
}
@@ -112,7 +114,7 @@ public class KafkaExactlyOnceDemo {
CountDownLatch consumeLatch = new CountDownLatch(1);
/* Stage 4: consume all processed messages to verify exactly once */
- Consumer consumerThread = new Consumer(OUTPUT_TOPIC,
"Verify-consumer", true, numRecords, consumeLatch);
+ Consumer consumerThread = new Consumer(OUTPUT_TOPIC,
"Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
consumerThread.start();
if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java
b/examples/src/main/java/kafka/examples/KafkaProperties.java
index cd737cf..e73c8d7 100644
--- a/examples/src/main/java/kafka/examples/KafkaProperties.java
+++ b/examples/src/main/java/kafka/examples/KafkaProperties.java
@@ -20,11 +20,6 @@ public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
- public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
- public static final int CONNECTION_TIMEOUT = 100000;
- public static final String TOPIC2 = "topic2";
- public static final String TOPIC3 = "topic3";
- public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}
}