[ 
https://issues.apache.org/jira/browse/KAFKA-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715331#comment-15715331
 ] 

ASF GitHub Bot commented on KAFKA-4205:
---------------------------------------

GitHub user ataraxer opened a pull request:

    https://github.com/apache/kafka/pull/2204

    KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

    NPE was caused by `log.logSegments.toArray` resulting in array containing 
`null` values. The exact reason still remains somewhat a mystery to me, but it 
seems that the culprit is `JavaConverters` in combination with concurrent data 
structure access.
    
    Here's a simple code example to prove that:
    ```scala
    import java.util.concurrent.ConcurrentSkipListMap
    // Same as `JavaConversions`, but allows explicit conversions via 
`asScala`/`asJava` methods.
    import scala.collection.JavaConverters._
    
    case object Value
    val m = new ConcurrentSkipListMap[Int, Value.type]
    new Thread { override def run() = { while (true) m.put(9000, Value) } 
}.start()
    new Thread { override def run() = { while (true) m.remove(9000) } }.start()
    new Thread { override def run() = { while (true) { 
println(m.values.asScala.toArray.headOption) } } }.start()
    ```
    
    Running the example will occasionally print `Some(null)` indicating that 
there's something shady going on during `toArray` conversion.
    
    `null`s magically disappear by making the following change:
    ```diff
    - println(m.values.asScala.toArray.headOption)
    + println(m.values.asScala.toSeq.headOption)
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ataraxer/kafka KAFKA-4205

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2204.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2204
    
----
commit bcd32760015e9dfd564813076a07dbe1612eab00
Author: Anton Karamanov <atara...@yandex-team.ru>
Date:   2016-12-02T14:37:42Z

    KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

----


> NullPointerException in fetchOffsetsBefore
> ------------------------------------------
>
>                 Key: KAFKA-4205
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4205
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Andrew Grasso
>              Labels: reliability
>             Fix For: 0.10.1.1
>
>
> We recently observed the following error in brokers running 0.9.0.1:
> A client saw an Unkown error code in response to an offset request for 
> TOPICX, partition 0
> The server logs look like:
> {code}
> [2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
> TOPICX-0 for deletion. (kafka.log.Log)
> [2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to 
> offset request (kafka.server.KafkaApis)
> java.lang.NullPointerException
>         at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
>         at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
>         at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
>         at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> [2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
> (kafka.log.Log)
> [2016-09-21 21:27:07,263] INFO Deleting index 
> /path/to/kafka/data/TOPICX-0/00000000000527235760.index.deleted 
> (kafka.log.OffsetIndex)
> {code}
> I suspect a race condition between {{Log.deleteSegment}} (which takes a lock 
> on the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any 
> lock. In particular, line 513 in KafkaApis looks like:
> {code:title=KafkaApis.scala|borderStyle=solid}
> 510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
> Int): Seq[Long] = {
> 511    val segsArray = log.logSegments.toArray
> 512    var offsetTimeArray: Array[(Long, Long)] = null
> 513    val lastSegmentHasSize = segsArray.last.size > 0;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to