kacper chwialkowski created KAFKA-888:
-----------------------------------------
Summary: problems when shutting down the java consumer .
Key: KAFKA-888
URL: https://issues.apache.org/jira/browse/KAFKA-888
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.8
Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013
x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2
Reporter: kacper chwialkowski
Assignee: Neha Narkhede
Priority: Minor
I got the following error when shutting down the consumer :
ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0]
INFO kafka.consumer.SimpleConsumer - Reconnect due to socket error:
java.nio.channels.ClosedByInterruptException: null
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
~[na:1.7.0_21]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386)
~[na:1.7.0_21]
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
~[na:1.7.0_21]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
~[na:1.7.0_21]
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
~[na:1.7.0_21]
at kafka.utils.Utils$.read(Utils.scala:394)
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
and this is how I create my Consumer
public Boolean call() throws Exception {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
it.next();
LOGGER.info("Received the message. Shutting down");
consumer.commitOffsets();
consumer.shutdown();
return true;
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira