[ 
https://issues.apache.org/jira/browse/KAFKA-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Gan updated KAFKA-6404:
--------------------------
    Description: 
pls wait for some minutes, i'm editing...

kafka broker version: 0.10.0.1
cluster level: 240 nodes
situatition: someone uses high released version (such as 0.11.x) of 
bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously 
consume a topic with partitions spread all the brokers
phenomenon: 
1.broker server log:
errors like: 1) Connection to 2 was disconnected before the response was read;
2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
3) ERROR Processor got uncaught exception. (kafka.network.Processor) 
java.nio.BufferUnderflowException
2.common consumers keeping in rebalance status:
errors like:
1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured session.timeout.ms, which typically implies that the 
poll loop is spending too much time message processing. You can address this 
either by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.
2) java.lang.IllegalStateException: Correlation id for response (1246203) does 
not match request (1246122)

bad results: kafka cluster in sick
root cause: 
1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting 
requestVersion 3:
{code:java}
private val fetchRequestBuilder = new FetchRequestBuilder().
    clientId(clientId).
    replicaId(Request.OrdinaryConsumerId).
    maxWait(config.fetchWaitMaxMs).
    minBytes(config.fetchMinBytes).
    requestVersion(3) // for now, the old consumer is pinned to the old message 
format through the fetch request
{code}

but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't 
read the field "max_bytes" from version 3 :
{code:java}
  def readFrom(buffer: ByteBuffer): FetchRequest = {
    val versionId = buffer.getShort
    val correlationId = buffer.getInt
    val clientId = readShortString(buffer)
    val replicaId = buffer.getInt
    val maxWait = buffer.getInt
    val minBytes = buffer.getInt
    val topicCount = buffer.getInt
    val pairs = (1 to topicCount).flatMap(_ => {
      val topic = readShortString(buffer)
      val partitionCount = buffer.getInt
      (1 to partitionCount).map(_ => {
        val partitionId = buffer.getInt
        val offset = buffer.getLong
        val fetchSize = buffer.getInt
        (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, 
fetchSize))
      })
    })
    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
minBytes, Map(pairs:_*))
  }
{code}

2) when the FetchRequst.readFrom crashed with throwable not in 
"(InvalidRequestException, SchemaException)", the socket wouldn't be closed;
SocketServer.processCompletedReceives:

{code:java}
  private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val openChannel = selector.channel(receive.source)
        // Only methods that are safe to call on a disconnected channel should 
be invoked on 'openOrClosingChannel'.
        val openOrClosingChannel = if (openChannel != null) openChannel else 
selector.closingChannel(receive.source)
        val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)

        val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that 
receive.source is valid. Issues with constructing a valid receive object were 
handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }
{code}




 




  was:
pls wait for some minutes, i'm editing...

kafka broker version: 0.10.0.1
cluster level: 240 nodes
situatition: someone use high released version (such as 0.11.x) of 
bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously 
consume a topic with partitions spread all the brokers
phenomenon: 
1.broker server log:
errors like: 1) Connection to 2 was disconnected before the response was read;
2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
3) ERROR Processor got uncaught exception. (kafka.network.Processor) 
java.nio.BufferUnderflowException
2.common consumers keeping in rebalance status:
errors like:
1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured session.timeout.ms, which typically implies that the 
poll loop is spending too much time message processing. You can address this 
either by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.
2) java.lang.IllegalStateException: Correlation id for response (1246203) does 
not match request (1246122)

bad results: kafka cluster in sick
root cause: 
1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting 
requestVersion 3:
{code:java}
private val fetchRequestBuilder = new FetchRequestBuilder().
    clientId(clientId).
    replicaId(Request.OrdinaryConsumerId).
    maxWait(config.fetchWaitMaxMs).
    minBytes(config.fetchMinBytes).
    requestVersion(3) // for now, the old consumer is pinned to the old message 
format through the fetch request
{code}

but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't 
read the field "max_bytes" from version 3 :
{code:java}
  def readFrom(buffer: ByteBuffer): FetchRequest = {
    val versionId = buffer.getShort
    val correlationId = buffer.getInt
    val clientId = readShortString(buffer)
    val replicaId = buffer.getInt
    val maxWait = buffer.getInt
    val minBytes = buffer.getInt
    val topicCount = buffer.getInt
    val pairs = (1 to topicCount).flatMap(_ => {
      val topic = readShortString(buffer)
      val partitionCount = buffer.getInt
      (1 to partitionCount).map(_ => {
        val partitionId = buffer.getInt
        val offset = buffer.getLong
        val fetchSize = buffer.getInt
        (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, 
fetchSize))
      })
    })
    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
minBytes, Map(pairs:_*))
  }
{code}

2) when the FetchRequst.readFrom crashed with throwable not in 
"(InvalidRequestException, SchemaException)", the socket wouldn't be closed;
SocketServer.processCompletedReceives:

{code:java}
  private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val openChannel = selector.channel(receive.source)
        // Only methods that are safe to call on a disconnected channel should 
be invoked on 'openOrClosingChannel'.
        val openOrClosingChannel = if (openChannel != null) openChannel else 
selector.closingChannel(receive.source)
        val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)

        val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that 
receive.source is valid. Issues with constructing a valid receive object were 
handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }
{code}




 





> OldConsumer FetchRequest apiVersion not match resulting in broker 
> RequestHandler socket leak
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6404
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.0.1
>            Reporter: Yu Gan
>            Priority: Critical
>
> pls wait for some minutes, i'm editing...
> kafka broker version: 0.10.0.1
> cluster level: 240 nodes
> situatition: someone uses high released version (such as 0.11.x) of 
> bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously 
> consume a topic with partitions spread all the brokers
> phenomenon: 
> 1.broker server log:
> errors like: 1) Connection to 2 was disconnected before the response was read;
> 2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
> 3) ERROR Processor got uncaught exception. (kafka.network.Processor) 
> java.nio.BufferUnderflowException
> 2.common consumers keeping in rebalance status:
> errors like:
> 1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> 2) java.lang.IllegalStateException: Correlation id for response (1246203) 
> does not match request (1246122)
> bad results: kafka cluster in sick
> root cause: 
> 1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting 
> requestVersion 3:
> {code:java}
> private val fetchRequestBuilder = new FetchRequestBuilder().
>     clientId(clientId).
>     replicaId(Request.OrdinaryConsumerId).
>     maxWait(config.fetchWaitMaxMs).
>     minBytes(config.fetchMinBytes).
>     requestVersion(3) // for now, the old consumer is pinned to the old 
> message format through the fetch request
> {code}
> but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't 
> read the field "max_bytes" from version 3 :
> {code:java}
>   def readFrom(buffer: ByteBuffer): FetchRequest = {
>     val versionId = buffer.getShort
>     val correlationId = buffer.getInt
>     val clientId = readShortString(buffer)
>     val replicaId = buffer.getInt
>     val maxWait = buffer.getInt
>     val minBytes = buffer.getInt
>     val topicCount = buffer.getInt
>     val pairs = (1 to topicCount).flatMap(_ => {
>       val topic = readShortString(buffer)
>       val partitionCount = buffer.getInt
>       (1 to partitionCount).map(_ => {
>         val partitionId = buffer.getInt
>         val offset = buffer.getLong
>         val fetchSize = buffer.getInt
>         (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, 
> fetchSize))
>       })
>     })
>     FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, 
> minBytes, Map(pairs:_*))
>   }
> {code}
> 2) when the FetchRequst.readFrom crashed with throwable not in 
> "(InvalidRequestException, SchemaException)", the socket wouldn't be closed;
> SocketServer.processCompletedReceives:
> {code:java}
>   private def processCompletedReceives() {
>     selector.completedReceives.asScala.foreach { receive =>
>       try {
>         val openChannel = selector.channel(receive.source)
>         // Only methods that are safe to call on a disconnected channel 
> should be invoked on 'openOrClosingChannel'.
>         val openOrClosingChannel = if (openChannel != null) openChannel else 
> selector.closingChannel(receive.source)
>         val session = RequestChannel.Session(new 
> KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
> openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
>         val req = RequestChannel.Request(processor = id, connectionId = 
> receive.source, session = session,
>           buffer = receive.payload, startTimeNanos = time.nanoseconds,
>           listenerName = listenerName, securityProtocol = securityProtocol)
>         requestChannel.sendRequest(req)
>         selector.mute(receive.source)
>       } catch {
>         case e @ (_: InvalidRequestException | _: SchemaException) =>
>           // note that even though we got an exception, we can assume that 
> receive.source is valid. Issues with constructing a valid receive object were 
> handled earlier
>           error(s"Closing socket for ${receive.source} because of error", e)
>           close(selector, receive.source)
>       }
>     }
>   }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to