This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf660fdeb6b KAFKA-18881 Document the ConsumerRecord as non-thread safe 
(#19056)
bf660fdeb6b is described below

commit bf660fdeb6bd1cdfc984910c88477842770c2943
Author: Logan Zhu <[email protected]>
AuthorDate: Mon Mar 3 13:03:36 2025 +0800

    KAFKA-18881 Document the ConsumerRecord as non-thread safe (#19056)
    
    There are 3 issues (at least) about the multithreaded issue on 
ConsumerRecords. Hence, it would be better to document it completely.
    
    Reviewers: Kirk True <[email protected]>, TengYao Chi 
<[email protected]>, Ken Huang <[email protected]>, Xuan-Zhang Gong 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/ConsumerRecord.java     | 23 ++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 453503e6dac..11360b0dac0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -21,12 +21,31 @@ import 
org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 
+import java.util.ConcurrentModificationException;
 import java.util.Optional;
 
 /**
- * A key/value pair to be received from Kafka. This also consists of a topic 
name and 
- * a partition number from which the record is being received, an offset that 
points 
+ * A key/value pair to be received from Kafka. This also consists of a topic 
name and
+ * a partition number from which the record is being received, an offset that 
points
  * to the record in a Kafka partition, and a timestamp as marked by the 
corresponding ProducerRecord.
+ * <p>
+ *
+ * <h3>Thread Safety</h3>
+ * This consumer record is <b>not thread-safe</b>. Concurrent access to a 
{@code ConsumerRecord} instance by
+ * multiple threads may result in undefined behavior, including but not 
limited to the following:
+ * <ul>
+ *   <li>Throwing {@link ConcurrentModificationException} (e.g., when 
concurrently modifying {@link #headers()}).</li>
+ *   <li>Data corruption or logical errors (e.g., inconsistent state of {@code 
headers} or {@code value}).</li>
+ *   <li>Visibility issues (e.g., modifications by one thread not being 
visible to another thread).</li>
+ * </ul>
+ *
+ * <p>
+ * In particular, the {@link #headers()} method returns a mutable collection 
of headers. If multiple
+ * threads access or modify these headers concurrently, it may lead to race 
conditions or inconsistent
+ * states. It is the responsibility of the user to ensure that multi-threaded 
access is properly synchronized.
+ *
+ * <p>
+ * Refer to the {@link KafkaConsumer} documentation for more details on 
multi-threaded consumption and processing strategies.
  */
 public class ConsumerRecord<K, V> {
     public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;

Reply via email to