[ https://issues.apache.org/jira/browse/KAFKA-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725398#comment-14725398 ]
Serhey Novachenko commented on KAFKA-2496: ------------------------------------------ I also tried assigning partitions instead of subscribing: {noformat} consumer.assign(List(new TopicPartition("mirror", 0)).asJava) consumer.seek(new TopicPartition("mirror", 0), 0) {noformat} It looks like Kafka server does not fail now, but I get another exception while trying to poll the consumer: {noformat} Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'topic': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:379) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:229) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:780) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731) at Sandbox$.main(Sandbox.scala:42) at Sandbox.main(Sandbox.scala) {noformat} > New consumer from trunk doesn't work with 0.8.2.1 brokers > --------------------------------------------------------- > > Key: KAFKA-2496 > URL: https://issues.apache.org/jira/browse/KAFKA-2496 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.2.1 > Reporter: Serhey Novachenko > > I have a 0.8.2.1 broker running with a topic created and some messages in it. > I also have a consumer built from trunk (commit > 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise). > When trying to consume messages from this topic the consumer fails with a > following stacktrace: > {noformat} > Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected > error in join group response: The server experienced an unexpected error when > processing the request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145) > at > org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195) > at > org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731) > at Sandbox$.main(Sandbox.scala:38) > at Sandbox.main(Sandbox.scala) > {noformat} > What actually happens is broker being unable to handle the JoinGroup request > from consumer: > {noformat} > [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request > Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body: > {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range} > (kafka.server.KafkaApis) > kafka.common.KafkaException: Unknown api code 11 > at kafka.server.KafkaApis.handle(KafkaApis.scala:70) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The consumer code that leads to this is pretty much straightforward: > {noformat} > import org.apache.kafka.clients.consumer.KafkaConsumer > import scala.collection.JavaConverters._ > object Sandbox { > def main(args: Array[String]) { > val consumerProps = new Properties > consumerProps.put("bootstrap.servers", "localhost:9092") > consumerProps.put("group.id", "mirror_maker_group") > consumerProps.put("enable.auto.commit", "false") > consumerProps.put("session.timeout.ms", "30000") > consumerProps.put("key.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > consumerProps.put("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) > consumer.subscribe(List("mirror").asJava) > val records = consumer.poll(1000) > for (record <- records.iterator().asScala) { > println(record.offset()) > } > } > } > {noformat} > I looked into the source code of the Kafka server in 0.8.2.1 branch and it > does not have the logic to handle JoinGroup request. It does not actually > have all the logic related to consumer coordination there so I wonder if > there is any way to make the new consumer work with 0.8.2.1 brokers? -- This message was sent by Atlassian JIRA (v6.3.4#6332)