[
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777647#comment-17777647
]
Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 9:31 AM:
----------------------------------------------------------------
In 3.6, we started using Buffer pool local to each request handler thread to
perform decompression. The above stack trace indicates a null when we ask for a
buffer from the buffer pool, returned by bufferQueue.pollfirst() at
[https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76]
Ideally that should not be possible bufferQueue.pollfirst() returns null only
when bufferQueue is empty. And we already check fr bufferQueue being empty in
the line above. Also this function is not thread safe. It doesn't need to be
thread safe because a particular instance of buffer pool (DefaultSupplier) is
associated with single thread (the request handler thread) and only one thread
should be accessing it at one time.
Either we are incorrectly accessing DefaultBufferSupplier.get() from two
threads and causing race condition OR somehow in the same thread reference is
being set to null/or garbage collected?!
I will try to eyeball to code here to see if I can find something. But
practically [~twmb] it would be greatly useful if you can share your
integration/unit test with us so that we can find a deterministic way to
reproduce it.
was (Author: divijvaidya):
In 3.6, we started using Buffer pool local to each request handler thread to
perform decompression. The above stack trace indicates a null when we ask for a
buffer from the buffer pool, returned by bufferQueue.pollfirst() at
[https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76]
Ideally that should not be possible bufferQueue.pollfirst() returns null only
when bufferQueue is empty. And we already check fr bufferQueue being empty in
the line above. But this function is not thread safe. It doesn't need to be
because a particular instance of buffer pool (DefaultSupplier) is associated
with single thread (the request handler thread) and only one thread should be
accessing it at one time.
I will try to eyeball to code here to see if I can find something. But
practically [~twmb] it would be greatly useful if you can share your
integration/unit test with us so that we can find a deterministic way to
reproduce it.
> NPE in ChunkedByteStream
> ------------------------
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest
> release.
> Reporter: Travis Bischel
> Assignee: Divij Vaidya
> Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR
> from producing. The broker logs for the failing request:
>
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing
> append operation on partition
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
> at
> org.apache.kafka.common.utils.ChunkedBytesStream.<init>(ChunkedBytesStream.java:89)
> at
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
> at
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
> at
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
> at
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
> at
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
> at
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
> at
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
> at
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
> at
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
> at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
> at
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
> at
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
> at
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
> at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)