This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8900a187035 KAFKA-12999 Make RecordHeader reads thread-safe (KIP-1205) 
(#20751)
8900a187035 is described below

commit 8900a1870354755283a5ed610e0b1420eca5e912
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sun Nov 2 22:35:33 2025 +0800

    KAFKA-12999 Make RecordHeader reads thread-safe (KIP-1205) (#20751)
    
    This patch implements
    
    
[KIP-1205](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1205%3A+Improve+RecordHeader+to+be+Thread-Safe)
    to address concurrency issues (NPE) related to the lazy initialization
    of `RecordHeader` fields.
    
    The added test verified that concurrent access to `RecordHeader.key()`
    and `RecordHeader.value()` no longer throws `NullPointerException`
    
    Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/clients/consumer/ConsumerRecord.java     |  5 +++
 .../common/header/internals/RecordHeader.java      | 22 +++++++----
 .../common/header/internals/RecordHeadersTest.java | 44 ++++++++++++++++++++++
 docs/upgrade.html                                  |  4 ++
 4 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 11360b0dac0..fa4ff890542 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -45,6 +45,11 @@ import java.util.Optional;
  * states. It is the responsibility of the user to ensure that multi-threaded 
access is properly synchronized.
  *
  * <p>
+ * However, each individual {@link org.apache.kafka.common.header.Header} 
instance
+ * is <b>read thread-safe</b>; that is, it is safe for multiple threads to 
read the same header's key or value concurrently
+ * as long as no thread modifies it.
+ *
+ * <p>
  * Refer to the {@link KafkaConsumer} documentation for more details on 
multi-threaded consumption and processing strategies.
  */
 public class ConsumerRecord<K, V> {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
 
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
index e1ce6ad01a5..3eee4d27da5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -25,9 +25,9 @@ import java.util.Objects;
 
 public class RecordHeader implements Header {
     private ByteBuffer keyBuffer;
-    private String key;
-    private ByteBuffer valueBuffer;
-    private byte[] value;
+    private volatile String key;
+    private volatile ByteBuffer valueBuffer;
+    private volatile byte[] value;
 
     public RecordHeader(String key, byte[] value) {
         Objects.requireNonNull(key, "Null header keys are not permitted");
@@ -42,16 +42,24 @@ public class RecordHeader implements Header {
     
     public String key() {
         if (key == null) {
-            key = Utils.utf8(keyBuffer, keyBuffer.remaining());
-            keyBuffer = null;
+            synchronized (this) {
+                if (key == null) {
+                    key = Utils.utf8(keyBuffer, keyBuffer.remaining());
+                    keyBuffer = null;
+                }
+            }
         }
         return key;
     }
 
     public byte[] value() {
         if (value == null && valueBuffer != null) {
-            value = Utils.toArray(valueBuffer);
-            valueBuffer = null;
+            synchronized (this) {
+                if (value == null && valueBuffer != null) {
+                    value = Utils.toArray(valueBuffer);
+                    valueBuffer = null;
+                }
+            }
         }
         return value;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
 
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index 41104194991..605b5598e13 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -19,10 +19,17 @@ package org.apache.kafka.common.header.internals;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -265,4 +272,41 @@ public class RecordHeadersTest {
         assertArrayEquals(value.getBytes(), actual.value());
     }
 
+    private void assertRecordHeaderReadThreadSafe(RecordHeader header) {
+        int threadCount = 16;
+        CountDownLatch startLatch = new CountDownLatch(1);
+
+        var futures = IntStream.range(0, threadCount)
+            .mapToObj(i -> CompletableFuture.runAsync(() -> {
+                try {
+                    startLatch.await();
+                    header.key();
+                    header.value();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+            })).collect(Collectors.toUnmodifiableList());
+
+        startLatch.countDown();
+        futures.forEach(CompletableFuture::join);
+    }
+
+    @RepeatedTest(100)
+    public void testRecordHeaderIsReadThreadSafe() throws Exception {
+        RecordHeader header = new RecordHeader(
+            ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
+            ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8))
+        );
+        assertRecordHeaderReadThreadSafe(header);
+    }
+
+    @RepeatedTest(100)
+    public void testRecordHeaderWithNullValueIsReadThreadSafe() throws 
Exception {
+        RecordHeader header = new RecordHeader(
+            ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
+            null
+        );
+        assertRecordHeaderReadThreadSafe(header);
+    }
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 36c1ca6a929..c0e5d6adf8c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -25,6 +25,10 @@
 
 <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 
4.2.0</a></h5>
 <ul>
+    <li>
+        The <code>org.apache.kafka.common.header.internals.RecordHeader</code> 
class has been updated to be read thread-safe. See <a 
href="https://cwiki.apache.org/confluence/x/nYmhFg";>KIP-1205</a> for details.
+        In other words, each individual <code>Header</code> object within a 
<code>ConsumerRecord</code>'s <code>headers</code> can now be safely read from 
multiple threads concurrently.
+    </li>
     <li>
         The <code>org.apache.kafka.disallowed.login.modules</code> config was 
deprecated. Please use the <code>org.apache.kafka.allowed.login.modules</code>
         instead.

Reply via email to