This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7157c05cc9e KAFKA-20065: Improve code examples in consumer javadoc 
(#21299)
7157c05cc9e is described below

commit 7157c05cc9e2834e227a23207d70520b07793441
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Jan 15 14:08:13 2026 +0000

    KAFKA-20065: Improve code examples in consumer javadoc (#21299)
    
    The code examples in the javadoc often include HTML escaping for double
    quotes and angle brackets. This can be avoided by tagging the code
    examples as code, as opposed to just preformatted. The benefit is that
    the code examples are much more readable in the source code, which makes
    them much easier for the code authors to read and get correct.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |   2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  74 ++++++++-------
 .../kafka/clients/consumer/KafkaShareConsumer.java | 100 +++++++++++----------
 3 files changed, 94 insertions(+), 82 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 1c025bbd42a..817545e3748 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -71,7 +71,7 @@ import java.util.Set;
  *     <li>Typically an {@code all()} method is provided for getting the 
overall success/failure of the batch and a
  *     {@code values()} method provided access to each item in a request batch.
  *     Other methods may also be provided.
- *     <li>For synchronous behaviour use {@link KafkaFuture#get()}
+ *     <li>For synchronous behavior, use {@link KafkaFuture#get()}.
  * </ul>
  * <p>
  * Here is a simple example of using an Admin client instance to create a new 
topic:
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bf00bf5c4d4..ad179f88e55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -177,21 +177,22 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * This example demonstrates a simple usage of Kafka's consumer api that 
relies on automatic offset committing.
  * <p>
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;enable.auto.commit&quot;, &quot;true&quot;);
- *     props.setProperty(&quot;auto.commit.interval.ms&quot;, 
&quot;1000&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new 
KafkaConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("enable.auto.commit", "true");
+ *     props.setProperty("auto.commit.interval.ms", "1000");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ *     consumer.subscribe(Arrays.asList("foo", "bar"));
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(100));
- *         for (ConsumerRecord&lt;String, String&gt; record : records)
- *             System.out.printf(&quot;offset = %d, key = %s, value = 
%s%n&quot;, record.offset(), record.key(), record.value());
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+ *         for (ConsumerRecord<String, String> record : records)
+ *             System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offset(), record.key(), record.value());
  *     }
- * </pre>
+ * }</pre>
  *
  * The connection to the cluster is bootstrapped by specifying a list of one 
or more brokers to contact using the
  * configuration {@code bootstrap.servers}. This list is just used to discover 
the rest of the brokers in the
@@ -215,28 +216,29 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
 
  * <p>
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;enable.auto.commit&quot;, &quot;false&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaConsumer&lt;String, String&gt; consumer = new 
KafkaConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("enable.auto.commit", "false");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ *     consumer.subscribe(Arrays.asList("foo", "bar"));
  *     final int minBatchSize = 200;
- *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new 
ArrayList&lt;&gt;();
+ *     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(100));
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+ *         for (ConsumerRecord<String, String> record : records) {
  *             buffer.add(record);
  *         }
- *         if (buffer.size() &gt;= minBatchSize) {
+ *         if (buffer.size() >= minBatchSize) {
  *             insertIntoDb(buffer);
  *             consumer.commitSync();
  *             buffer.clear();
  *         }
  *     }
- * </pre>
+ * }</pre>
  *
  * In this example we will consume a batch of records and batch them up in 
memory. When we have enough records
  * batched, we will insert them into a database. If we allowed offsets to auto 
commit as in the previous example, records
@@ -263,13 +265,14 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * In the example below we commit offset after we finish handling the records 
in each partition.
  * <p>
  * <pre>
+ * {@code
  *     try {
  *         while(running) {
- *             ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+ *             ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
  *             for (TopicPartition partition : records.partitions()) {
- *                 List&lt;ConsumerRecord&lt;String, String&gt;&gt; 
partitionRecords = records.records(partition);
- *                 for (ConsumerRecord&lt;String, String&gt; record : 
partitionRecords) {
- *                     System.out.println(record.offset() + &quot;: &quot; + 
record.value());
+ *                 List<ConsumerRecord<String, String>> partitionRecords = 
records.records(partition);
+ *                 for (ConsumerRecord<String, String> record : 
partitionRecords) {
+ *                     System.out.println(record.offset() + ": " + 
record.value());
  *                 }
  *                 consumer.commitSync(Collections.singletonMap(partition, 
records.nextOffsets().get(partition)));
  *             }
@@ -277,7 +280,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *     } finally {
  *       consumer.close();
  *     }
- * </pre>
+ * }</pre>
  *
  * <b>Note: The committed offset should always be the offset of the next 
message that your application will read.</b>
  * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should 
use {@code nextRecordToBeProcessed.offset()}
@@ -304,11 +307,12 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * {@link #assign(Collection)} with the full list of partitions that you want 
to consume.
  *
  * <pre>
- *     String topic = &quot;foo&quot;;
+ * {@code
+ *     String topic = "foo";
  *     TopicPartition partition0 = new TopicPartition(topic, 0);
  *     TopicPartition partition1 = new TopicPartition(topic, 1);
  *     consumer.assign(Arrays.asList(partition0, partition1));
- * </pre>
+ * }</pre>
  *
  * Once assigned, you can call {@link #poll(Duration) poll} in a loop, just as 
in the preceding examples to consume
  * records. The group that the consumer specifies is still used for committing 
offsets, but now the set of partitions
@@ -444,6 +448,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * The following snippet shows the typical pattern:
  *
  * <pre>
+ * {@code
  * public class KafkaConsumerRunner implements Runnable {
  *     private final AtomicBoolean closed = new AtomicBoolean(false);
  *     private final KafkaConsumer consumer;
@@ -452,7 +457,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *       this.consumer = consumer;
  *     }
  *
- *     {@literal @Override}
+ *     @Override
  *     public void run() {
  *         try {
  *             consumer.subscribe(Arrays.asList("topic"));
@@ -474,15 +479,16 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *         consumer.wakeup();
  *     }
  * }
- * </pre>
+ * }</pre>
  *
  * Then in a separate thread, the consumer can be shutdown by setting the 
closed flag and waking up the consumer.
  *
  * <p>
  * <pre>
+ * {@code
  *     closed.set(true);
  *     consumer.wakeup();
- * </pre>
+ * }</pre>
  *
  * <p>
  * Note that while it is possible to use thread interrupts instead of {@link 
#wakeup()} to abort a blocking operation
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index 06d14d0bad2..75e2b27cdd9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -172,60 +172,63 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * were delivered in the previous poll. All the records delivered are 
implicitly marked as successfully consumed and
  * acknowledged synchronously with Kafka as the consumer fetches more records.
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
  *
- *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
+ *     KafkaShareConsumer<String, String> consumer = new 
KafkaShareConsumer<>(props);
+ *     consumer.subscribe(Arrays.asList("foo"));
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(1000));
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
- *             System.out.printf(&quot;offset = %d, key = %s, value = 
%s%n&quot;, record.offset(), record.key(), record.value());
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+ *         for (ConsumerRecord<String, String> record : records) {
+ *             System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offset(), record.key(), record.value());
  *             doProcessing(record);
  *         }
  *     }
- * </pre>
+ * }</pre>
  *
  * Alternatively, you can use {@link #commitSync()} or {@link #commitAsync()} 
to commit the acknowledgements, but this is
  * slightly less efficient because there is an additional request sent to 
Kafka.
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
  *
- *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
+ *     KafkaShareConsumer<String, String> consumer = new 
KafkaShareConsumer<>(props);
+ *     consumer.subscribe(Arrays.asList("foo"));
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(1000));
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
- *             System.out.printf(&quot;offset = %d, key = %s, value = 
%s%n&quot;, record.offset(), record.key(), record.value());
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+ *         for (ConsumerRecord<String, String> record : records) {
+ *             System.out.printf("offset = %d, key = %s, value = %s%n", 
record.offset(), record.key(), record.value());
  *             doProcessing(record);
  *         }
  *         consumer.commitSync();
  *     }
- * </pre>
+ * }</pre>
  *
  * <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
  * This example demonstrates using different acknowledge types depending on 
the outcome of processing the records.
  * Here the {@code share.acknowledgement.mode} property is set to "explicit" 
so the consumer must explicitly acknowledge each record.
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;share.acknowledgement.mode&quot;, 
&quot;explicit&quot;);
- *
- *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("share.acknowledgement.mode", "explicit");
+ *
+ *     KafkaShareConsumer<String, String> consumer = new 
KafkaShareConsumer<>(props);
+ *     consumer.subscribe(Arrays.asList("foo"));
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(1000));
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
+ *         for (ConsumerRecord<String, String> record : records) {
  *             try {
  *                 doProcessing(record);
  *                 consumer.acknowledge(record, AcknowledgeType.ACCEPT);
@@ -235,7 +238,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *         }
  *         consumer.commitSync();
  *     }
- * </pre>
+ * }</pre>
  *
  * Each record processed is separately acknowledged using a call to {@link 
#acknowledge(ConsumerRecord, AcknowledgeType)}.
  * The {@link AcknowledgeType} argument indicates whether the record was 
processed successfully or not. In this case,
@@ -273,18 +276,19 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * <p>This example illustrates how an application could use renewal 
acknowledgement.
  *
  * <pre>
+ * {@code
  *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;share.acknowledgement.mode&quot;, 
&quot;explicit&quot;);
- *     props.setProperty(&quot;share.acquire.mode&quot;, 
&quot;record_limit&quot;);
- *     props.setProperty(&quot;max.poll.records&quot;, &quot;1&quot;);
+ *     props.setProperty("bootstrap.servers", "localhost:9092");
+ *     props.setProperty("group.id", "test");
+ *     props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+ *     props.setProperty("share.acknowledgement.mode", "explicit");
+ *     props.setProperty("share.acquire.mode", "record_limit");
+ *     props.setProperty("max.poll.records", "1");
  *
- *     HashMap&lt;Long, ConsumerRecord&lt;String, String&gt;&gt; processing = 
new HashMap&lt;&gt;();
+ *     HashMap<Long, ConsumerRecord<String, String>> processing = new 
HashMap<>();
  *
- *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
+ *     KafkaShareConsumer<String, String> consumer = new 
KafkaShareConsumer<>(props);
  *     consumer.setAcknowledgementCommitCallback((offsets, exception) -> {
  *        if (exception != null) {
  *            for (long offset: offsets) {
@@ -295,14 +299,14 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *        }
  *     });
  *
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
+ *     consumer.subscribe(Arrays.asList("foo"));
  *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(1000));
+ *         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
  *
  *         // Get the acquisition lock timeout and prepare to wait half that 
time before renewing again
  *         int timeToWaitMs = 
consumer.acquisitionLockTimeoutMs().getOrElse(10000) / 2;
  *
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
+ *         for (ConsumerRecord<String, String> record : records) {
  *             if (processing.put(rec.offset(), rec) == null) {
  *                 // Start the processing on another thread
  *             }
@@ -320,7 +324,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *             }
  *         }
  *     }
- * </pre>
+ * }</pre>
  * <p>
  * Note that using renewal acknowledgements is intended only for situations 
where the processing times of the records
  * exceeds the acquisition lock duration. Consumers which use renewal 
acknowledgements can impact the delivery
@@ -337,6 +341,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * The following snippet shows the typical pattern:
  *
  * <pre>
+ * {@code
  * public class KafkaShareConsumerRunner implements Runnable {
  *     private final AtomicBoolean closed = new AtomicBoolean(false);
  *     private final KafkaShareConsumer consumer;
@@ -345,7 +350,7 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *       this.consumer = consumer;
  *     }
  *
- *     {@literal @Override}
+ *     @Override
  *     public void run() {
  *         try {
  *             consumer.subscribe(Arrays.asList("topic"));
@@ -367,13 +372,14 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *         consumer.wakeup();
  *     }
  * }
- * </pre>
+ * }</pre>
  *
  * Then in a separate thread, the consumer can be shutdown by setting the 
closed flag and waking up the consumer.
  * <pre>
+ * {@code
  *     closed.set(true);
  *     consumer.wakeup();
- * </pre>
+ * }</pre>
  *
  * <p>
  * Note that while it is possible to use thread interrupts instead of {@link 
#wakeup()} to abort a blocking operation

Reply via email to