[ 
https://issues.apache.org/jira/browse/KAFKA-18471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-18471.
------------------------------------
    Resolution: Duplicate

duplicate to KAFKA-18470

>  Race conditions when accessing RecordHeader data
> -------------------------------------------------
>
>                 Key: KAFKA-18471
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18471
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.8.1
>            Reporter: Vinicius Vieira dos Santos
>            Priority: Major
>
> There is a race condition in the {{RecordHeader}} class of Kafka when an 
> instance is created using the [[constructor with 
> ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38].
>  In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a 
> process copies the {{ByteBuffer}} into a byte array.
> During this process, multiple threads may simultaneously invoke the method 
> responsible for the copying. This can lead to a situation where one thread 
> successfully completes the operation, while another abruptly has the buffer 
> set to {{null}} during the process.
>  
> Exception example:
>  
> {code:java}
> Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot 
> invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null
>     at 
> org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)
>     at 
> br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36)
>     at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>     at 
> br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1583) {code}
>  
> Code example for error:
>  
> {code:java}
> public class TestMainExample {
>     public static void main(String[] args) throws InterruptedException {
>         ExecutorService executorService = Executors.newFixedThreadPool(5);
>         for (int i = 0; i < 200_000; i++) {
>             Charset charset = StandardCharsets.UTF_8;
>             RecordHeaders headers = new RecordHeaders();
>             headers.add(new RecordHeader(charset.encode("header-key-1"), 
> charset.encode("value-1")));
>             headers.add(new RecordHeader(charset.encode("header-key-2"), 
> charset.encode("value-2")));
>             headers.add(new RecordHeader(charset.encode("header-key-3"), 
> charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
>             headers.add(new RecordHeader(charset.encode("header-key-4"), 
> charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
>             headers.add(new RecordHeader(charset.encode("header-key-5"), 
> charset.encode("account-number")));
>             headers.add(new RecordHeader(charset.encode("header-key-6"), 
> charset.encode("operation-id")));
>             headers.add(new RecordHeader(charset.encode("header-key-7"), 
> charset.encode("agency-code")));
>             headers.add(new RecordHeader(charset.encode("header-key-8"), 
> charset.encode("branch-code")));
>             
>             CountDownLatch count = new CountDownLatch(5);
>             for (int j = 0; j < 5; j++) {
>                 executorService.execute(() -> {
>                     try {
>                         headers.forEach((hdr) -> {
>                             if (hdr.value() == null) {
>                                 throw new IllegalStateException("Bug find on 
> value");
>                             }
>                             if (hdr.key() == null) {
>                                 throw new IllegalStateException("Bug find on 
> key");
>                             }
>                         });
>                     } finally {
>                         count.countDown();
>                     }
>                 });
>             }
>             count.await();
>         }
>     }
> } {code}
> I did a test synchronizing the method I use to access the headers and this 
> resolved the problem in the context of my application, but I believe the 
> ideal would be to either mark that the class is not thread safe or 
> synchronize access to the bytebuffer. Thank you in advance to the team.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to