This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8900a187035 KAFKA-12999 Make RecordHeader reads thread-safe (KIP-1205)
(#20751)
8900a187035 is described below
commit 8900a1870354755283a5ed610e0b1420eca5e912
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sun Nov 2 22:35:33 2025 +0800
KAFKA-12999 Make RecordHeader reads thread-safe (KIP-1205) (#20751)
This patch implements
[KIP-1205](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1205%3A+Improve+RecordHeader+to+be+Thread-Safe)
to address concurrency issues (NPE) related to the lazy initialization
of `RecordHeader` fields.
The added test verified that concurrent access to `RecordHeader.key()`
and `RecordHeader.value()` no longer throws `NullPointerException`
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/consumer/ConsumerRecord.java | 5 +++
.../common/header/internals/RecordHeader.java | 22 +++++++----
.../common/header/internals/RecordHeadersTest.java | 44 ++++++++++++++++++++++
docs/upgrade.html | 4 ++
4 files changed, 68 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 11360b0dac0..fa4ff890542 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -45,6 +45,11 @@ import java.util.Optional;
* states. It is the responsibility of the user to ensure that multi-threaded
access is properly synchronized.
*
* <p>
+ * However, each individual {@link org.apache.kafka.common.header.Header}
instance
+ * is <b>read thread-safe</b>; that is, it is safe for multiple threads to
read the same header's key or value concurrently
+ * as long as no thread modifies it.
+ *
+ * <p>
* Refer to the {@link KafkaConsumer} documentation for more details on
multi-threaded consumption and processing strategies.
*/
public class ConsumerRecord<K, V> {
diff --git
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
index e1ce6ad01a5..3eee4d27da5 100644
---
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
+++
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -25,9 +25,9 @@ import java.util.Objects;
public class RecordHeader implements Header {
private ByteBuffer keyBuffer;
- private String key;
- private ByteBuffer valueBuffer;
- private byte[] value;
+ private volatile String key;
+ private volatile ByteBuffer valueBuffer;
+ private volatile byte[] value;
public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "Null header keys are not permitted");
@@ -42,16 +42,24 @@ public class RecordHeader implements Header {
public String key() {
if (key == null) {
- key = Utils.utf8(keyBuffer, keyBuffer.remaining());
- keyBuffer = null;
+ synchronized (this) {
+ if (key == null) {
+ key = Utils.utf8(keyBuffer, keyBuffer.remaining());
+ keyBuffer = null;
+ }
+ }
}
return key;
}
public byte[] value() {
if (value == null && valueBuffer != null) {
- value = Utils.toArray(valueBuffer);
- valueBuffer = null;
+ synchronized (this) {
+ if (value == null && valueBuffer != null) {
+ value = Utils.toArray(valueBuffer);
+ valueBuffer = null;
+ }
+ }
}
return value;
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index 41104194991..605b5598e13 100644
---
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -19,10 +19,17 @@ package org.apache.kafka.common.header.internals;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -265,4 +272,41 @@ public class RecordHeadersTest {
assertArrayEquals(value.getBytes(), actual.value());
}
+ private void assertRecordHeaderReadThreadSafe(RecordHeader header) {
+ int threadCount = 16;
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ var futures = IntStream.range(0, threadCount)
+ .mapToObj(i -> CompletableFuture.runAsync(() -> {
+ try {
+ startLatch.await();
+ header.key();
+ header.value();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ })).collect(Collectors.toUnmodifiableList());
+
+ startLatch.countDown();
+ futures.forEach(CompletableFuture::join);
+ }
+
+ @RepeatedTest(100)
+ public void testRecordHeaderIsReadThreadSafe() throws Exception {
+ RecordHeader header = new RecordHeader(
+ ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8))
+ );
+ assertRecordHeaderReadThreadSafe(header);
+ }
+
+ @RepeatedTest(100)
+ public void testRecordHeaderWithNullValueIsReadThreadSafe() throws
Exception {
+ RecordHeader header = new RecordHeader(
+ ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
+ null
+ );
+ assertRecordHeaderReadThreadSafe(header);
+ }
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 36c1ca6a929..c0e5d6adf8c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -25,6 +25,10 @@
<h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in
4.2.0</a></h5>
<ul>
+ <li>
+ The <code>org.apache.kafka.common.header.internals.RecordHeader</code>
class has been updated to be read thread-safe. See <a
href="https://cwiki.apache.org/confluence/x/nYmhFg">KIP-1205</a> for details.
+ In other words, each individual <code>Header</code> object within a
<code>ConsumerRecord</code>'s <code>headers</code> can now be safely read from
multiple threads concurrently.
+ </li>
<li>
The <code>org.apache.kafka.disallowed.login.modules</code> config was
deprecated. Please use the <code>org.apache.kafka.allowed.login.modules</code>
instead.