[ 
https://issues.apache.org/jira/browse/KAFKA-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725398#comment-14725398
 ] 

Serhey Novachenko commented on KAFKA-2496:
------------------------------------------

I also tried assigning partitions instead of subscribing:

{noformat}
consumer.assign(List(new TopicPartition("mirror", 0)).asJava)
consumer.seek(new TopicPartition("mirror", 0), 0)
{noformat}

It looks like Kafka server does not fail now, but I get another exception while 
trying to poll the consumer:

{noformat}
Exception in thread "main" 
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading field 'topic': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:379)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:229)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:780)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
        at Sandbox$.main(Sandbox.scala:42)
        at Sandbox.main(Sandbox.scala)
{noformat}

> New consumer from trunk doesn't work with 0.8.2.1 brokers
> ---------------------------------------------------------
>
>                 Key: KAFKA-2496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2496
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.2.1
>            Reporter: Serhey Novachenko
>
> I have a 0.8.2.1 broker running with a topic created and some messages in it.
> I also have a consumer built from trunk (commit 
> 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).
> When trying to consume messages from this topic the consumer fails with a 
> following stacktrace:
> {noformat}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected 
> error in join group response: The server experienced an unexpected error when 
> processing the request
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
>       at Sandbox$.main(Sandbox.scala:38)
>       at Sandbox.main(Sandbox.scala)
> {noformat}
> What actually happens is broker being unable to handle the JoinGroup request 
> from consumer:
> {noformat}
> [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request 
> Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body: 
> {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range}
>  (kafka.server.KafkaApis)
> kafka.common.KafkaException: Unknown api code 11
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The consumer code that leads to this is pretty much straightforward:
> {noformat}
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import scala.collection.JavaConverters._
> object Sandbox {
>   def main(args: Array[String]) {
>     val consumerProps = new Properties
>     consumerProps.put("bootstrap.servers", "localhost:9092")
>     consumerProps.put("group.id", "mirror_maker_group")
>     consumerProps.put("enable.auto.commit", "false")
>     consumerProps.put("session.timeout.ms", "30000")
>     consumerProps.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     consumerProps.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
>     consumer.subscribe(List("mirror").asJava)
>     val records = consumer.poll(1000)
>     for (record <- records.iterator().asScala) {
>       println(record.offset())
>     }
>   }
> }
> {noformat}
> I looked into the source code of the Kafka server in 0.8.2.1 branch and it 
> does not have the logic to handle JoinGroup request. It does not actually 
> have all the logic related to consumer coordination there so I wonder if 
> there is any way to make the new consumer work with 0.8.2.1 brokers?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to