This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7157c05cc9e KAFKA-20065: Improve code examples in consumer javadoc
(#21299)
7157c05cc9e is described below
commit 7157c05cc9e2834e227a23207d70520b07793441
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Jan 15 14:08:13 2026 +0000
KAFKA-20065: Improve code examples in consumer javadoc (#21299)
The code examples in the javadoc often include HTML escaping for double
quotes and angle brackets. This can be avoided by tagging the code
examples as code, as opposed to just preformatted. The benefit is that
the code examples are much more readable in the source code, which makes
them much easier for the code authors to read and get correct.
Reviewers: Apoorv Mittal <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 74 ++++++++-------
.../kafka/clients/consumer/KafkaShareConsumer.java | 100 +++++++++++----------
3 files changed, 94 insertions(+), 82 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 1c025bbd42a..817545e3748 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -71,7 +71,7 @@ import java.util.Set;
* <li>Typically an {@code all()} method is provided for getting the
overall success/failure of the batch and a
* {@code values()} method provided access to each item in a request batch.
* Other methods may also be provided.
- * <li>For synchronous behaviour use {@link KafkaFuture#get()}
+ * <li>For synchronous behavior, use {@link KafkaFuture#get()}.
* </ul>
* <p>
* Here is a simple example of using an Admin client instance to create a new
topic:
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bf00bf5c4d4..ad179f88e55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -177,21 +177,22 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* This example demonstrates a simple usage of Kafka's consumer api that
relies on automatic offset committing.
* <p>
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("enable.auto.commit", "true");
- * props.setProperty("auto.commit.interval.ms",
"1000");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo", "bar"));
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("enable.auto.commit", "true");
+ * props.setProperty("auto.commit.interval.ms", "1000");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ * consumer.subscribe(Arrays.asList("foo", "bar"));
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records)
- * System.out.printf("offset = %d, key = %s, value =
%s%n", record.offset(), record.key(), record.value());
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ * for (ConsumerRecord<String, String> record : records)
+ * System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
* }
- * </pre>
+ * }</pre>
*
* The connection to the cluster is bootstrapped by specifying a list of one
or more brokers to contact using the
* configuration {@code bootstrap.servers}. This list is just used to discover
the rest of the brokers in the
@@ -215,28 +216,29 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* <p>
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("enable.auto.commit", "false");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo", "bar"));
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("enable.auto.commit", "false");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ * consumer.subscribe(Arrays.asList("foo", "bar"));
* final int minBatchSize = 200;
- * List<ConsumerRecord<String, String>> buffer = new
ArrayList<>();
+ * List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ * for (ConsumerRecord<String, String> record : records) {
* buffer.add(record);
* }
- * if (buffer.size() >= minBatchSize) {
+ * if (buffer.size() >= minBatchSize) {
* insertIntoDb(buffer);
* consumer.commitSync();
* buffer.clear();
* }
* }
- * </pre>
+ * }</pre>
*
* In this example we will consume a batch of records and batch them up in
memory. When we have enough records
* batched, we will insert them into a database. If we allowed offsets to auto
commit as in the previous example, records
@@ -263,13 +265,14 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* In the example below we commit offset after we finish handling the records
in each partition.
* <p>
* <pre>
+ * {@code
* try {
* while(running) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
* for (TopicPartition partition : records.partitions()) {
- * List<ConsumerRecord<String, String>>
partitionRecords = records.records(partition);
- * for (ConsumerRecord<String, String> record :
partitionRecords) {
- * System.out.println(record.offset() + ": " +
record.value());
+ * List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
+ * for (ConsumerRecord<String, String> record :
partitionRecords) {
+ * System.out.println(record.offset() + ": " +
record.value());
* }
* consumer.commitSync(Collections.singletonMap(partition,
records.nextOffsets().get(partition)));
* }
@@ -277,7 +280,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* } finally {
* consumer.close();
* }
- * </pre>
+ * }</pre>
*
* <b>Note: The committed offset should always be the offset of the next
message that your application will read.</b>
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should
use {@code nextRecordToBeProcessed.offset()}
@@ -304,11 +307,12 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* {@link #assign(Collection)} with the full list of partitions that you want
to consume.
*
* <pre>
- * String topic = "foo";
+ * {@code
+ * String topic = "foo";
* TopicPartition partition0 = new TopicPartition(topic, 0);
* TopicPartition partition1 = new TopicPartition(topic, 1);
* consumer.assign(Arrays.asList(partition0, partition1));
- * </pre>
+ * }</pre>
*
* Once assigned, you can call {@link #poll(Duration) poll} in a loop, just as
in the preceding examples to consume
* records. The group that the consumer specifies is still used for committing
offsets, but now the set of partitions
@@ -444,6 +448,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* The following snippet shows the typical pattern:
*
* <pre>
+ * {@code
* public class KafkaConsumerRunner implements Runnable {
* private final AtomicBoolean closed = new AtomicBoolean(false);
* private final KafkaConsumer consumer;
@@ -452,7 +457,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* this.consumer = consumer;
* }
*
- * {@literal @Override}
+ * @Override
* public void run() {
* try {
* consumer.subscribe(Arrays.asList("topic"));
@@ -474,15 +479,16 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* consumer.wakeup();
* }
* }
- * </pre>
+ * }</pre>
*
* Then in a separate thread, the consumer can be shutdown by setting the
closed flag and waking up the consumer.
*
* <p>
* <pre>
+ * {@code
* closed.set(true);
* consumer.wakeup();
- * </pre>
+ * }</pre>
*
* <p>
* Note that while it is possible to use thread interrupts instead of {@link
#wakeup()} to abort a blocking operation
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 06d14d0bad2..75e2b27cdd9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -172,60 +172,63 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* were delivered in the previous poll. All the records delivered are
implicitly marked as successfully consumed and
* acknowledged synchronously with Kafka as the consumer fetches more records.
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
*
- * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
+ * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
+ * consumer.subscribe(Arrays.asList("foo"));
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
- * for (ConsumerRecord<String, String> record : records) {
- * System.out.printf("offset = %d, key = %s, value =
%s%n", record.offset(), record.key(), record.value());
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ * for (ConsumerRecord<String, String> record : records) {
+ * System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
* doProcessing(record);
* }
* }
- * </pre>
+ * }</pre>
*
* Alternatively, you can use {@link #commitSync()} or {@link #commitAsync()}
to commit the acknowledgements, but this is
* slightly less efficient because there is an additional request sent to
Kafka.
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
*
- * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
+ * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
+ * consumer.subscribe(Arrays.asList("foo"));
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
- * for (ConsumerRecord<String, String> record : records) {
- * System.out.printf("offset = %d, key = %s, value =
%s%n", record.offset(), record.key(), record.value());
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ * for (ConsumerRecord<String, String> record : records) {
+ * System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
* doProcessing(record);
* }
* consumer.commitSync();
* }
- * </pre>
+ * }</pre>
*
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
* This example demonstrates using different acknowledge types depending on
the outcome of processing the records.
* Here the {@code share.acknowledgement.mode} property is set to "explicit"
so the consumer must explicitly acknowledge each record.
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("share.acknowledgement.mode",
"explicit");
- *
- * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("share.acknowledgement.mode", "explicit");
+ *
+ * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
+ * consumer.subscribe(Arrays.asList("foo"));
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
- * for (ConsumerRecord<String, String> record : records) {
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ * for (ConsumerRecord<String, String> record : records) {
* try {
* doProcessing(record);
* consumer.acknowledge(record, AcknowledgeType.ACCEPT);
@@ -235,7 +238,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* }
* consumer.commitSync();
* }
- * </pre>
+ * }</pre>
*
* Each record processed is separately acknowledged using a call to {@link
#acknowledge(ConsumerRecord, AcknowledgeType)}.
* The {@link AcknowledgeType} argument indicates whether the record was
processed successfully or not. In this case,
@@ -273,18 +276,19 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* <p>This example illustrates how an application could use renewal
acknowledgement.
*
* <pre>
+ * {@code
* Properties props = new Properties();
- * props.setProperty("bootstrap.servers",
"localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("share.acknowledgement.mode",
"explicit");
- * props.setProperty("share.acquire.mode",
"record_limit");
- * props.setProperty("max.poll.records", "1");
+ * props.setProperty("bootstrap.servers", "localhost:9092");
+ * props.setProperty("group.id", "test");
+ * props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ * props.setProperty("share.acknowledgement.mode", "explicit");
+ * props.setProperty("share.acquire.mode", "record_limit");
+ * props.setProperty("max.poll.records", "1");
*
- * HashMap<Long, ConsumerRecord<String, String>> processing =
new HashMap<>();
+ * HashMap<Long, ConsumerRecord<String, String>> processing = new
HashMap<>();
*
- * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
+ * KafkaShareConsumer<String, String> consumer = new
KafkaShareConsumer<>(props);
* consumer.setAcknowledgementCommitCallback((offsets, exception) -> {
* if (exception != null) {
* for (long offset: offsets) {
@@ -295,14 +299,14 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* }
* });
*
- * consumer.subscribe(Arrays.asList("foo"));
+ * consumer.subscribe(Arrays.asList("foo"));
* while (true) {
- * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
+ * ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
*
* // Get the acquisition lock timeout and prepare to wait half that
time before renewing again
* int timeToWaitMs =
consumer.acquisitionLockTimeoutMs().getOrElse(10000) / 2;
*
- * for (ConsumerRecord<String, String> record : records) {
+ * for (ConsumerRecord<String, String> record : records) {
* if (processing.put(rec.offset(), rec) == null) {
* // Start the processing on another thread
* }
@@ -320,7 +324,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* }
* }
* }
- * </pre>
+ * }</pre>
* <p>
* Note that using renewal acknowledgements is intended only for situations
where the processing times of the records
* exceeds the acquisition lock duration. Consumers which use renewal
acknowledgements can impact the delivery
@@ -337,6 +341,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* The following snippet shows the typical pattern:
*
* <pre>
+ * {@code
* public class KafkaShareConsumerRunner implements Runnable {
* private final AtomicBoolean closed = new AtomicBoolean(false);
* private final KafkaShareConsumer consumer;
@@ -345,7 +350,7 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* this.consumer = consumer;
* }
*
- * {@literal @Override}
+ * @Override
* public void run() {
* try {
* consumer.subscribe(Arrays.asList("topic"));
@@ -367,13 +372,14 @@ import static
org.apache.kafka.common.utils.Utils.propsToMap;
* consumer.wakeup();
* }
* }
- * </pre>
+ * }</pre>
*
* Then in a separate thread, the consumer can be shutdown by setting the
closed flag and waking up the consumer.
* <pre>
+ * {@code
* closed.set(true);
* consumer.wakeup();
- * </pre>
+ * }</pre>
*
* <p>
* Note that while it is possible to use thread interrupts instead of {@link
#wakeup()} to abort a blocking operation