[ https://issues.apache.org/jira/browse/KAFKA-18471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chia-Ping Tsai resolved KAFKA-18471. ------------------------------------ Resolution: Duplicate duplicate to KAFKA-18470 > Race conditions when accessing RecordHeader data > ------------------------------------------------- > > Key: KAFKA-18471 > URL: https://issues.apache.org/jira/browse/KAFKA-18471 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 3.8.1 > Reporter: Vinicius Vieira dos Santos > Priority: Major > > There is a race condition in the {{RecordHeader}} class of Kafka when an > instance is created using the [[constructor with > ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]. > In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a > process copies the {{ByteBuffer}} into a byte array. > During this process, multiple threads may simultaneously invoke the method > responsible for the copying. This can lead to a situation where one thread > successfully completes the operation, while another abruptly has the buffer > set to {{null}} during the process. > > Exception example: > > {code:java} > Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot > invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null > at > org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45) > at > br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > at java.base/java.lang.Thread.run(Thread.java:1583) {code} > > Code example for error: > > {code:java} > public class TestMainExample { > public static void main(String[] args) throws InterruptedException { > ExecutorService executorService = Executors.newFixedThreadPool(5); > for (int i = 0; i < 200_000; i++) { > Charset charset = StandardCharsets.UTF_8; > RecordHeaders headers = new RecordHeaders(); > headers.add(new RecordHeader(charset.encode("header-key-1"), > charset.encode("value-1"))); > headers.add(new RecordHeader(charset.encode("header-key-2"), > charset.encode("value-2"))); > headers.add(new RecordHeader(charset.encode("header-key-3"), > charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]"))); > headers.add(new RecordHeader(charset.encode("header-key-4"), > charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]"))); > headers.add(new RecordHeader(charset.encode("header-key-5"), > charset.encode("account-number"))); > headers.add(new RecordHeader(charset.encode("header-key-6"), > charset.encode("operation-id"))); > headers.add(new RecordHeader(charset.encode("header-key-7"), > charset.encode("agency-code"))); > headers.add(new RecordHeader(charset.encode("header-key-8"), > charset.encode("branch-code"))); > > CountDownLatch count = new CountDownLatch(5); > for (int j = 0; j < 5; j++) { > executorService.execute(() -> { > try { > headers.forEach((hdr) -> { > if (hdr.value() == null) { > throw new IllegalStateException("Bug find on > value"); > } > if (hdr.key() == null) { > throw new IllegalStateException("Bug find on > key"); > } > }); > } finally { > count.countDown(); > } > }); > } > count.await(); > } > } > } {code} > I did a test synchronizing the method I use to access the headers and this > resolved the problem in the context of my application, but I believe the > ideal would be to either mark that the class is not thread safe or > synchronize access to the bytebuffer. Thank you in advance to the team. -- This message was sent by Atlassian Jira (v8.20.10#820010)