[ 
https://issues.apache.org/jira/browse/KAFKA-888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-888.
-----------------------------
    Resolution: Cannot Reproduce

 Pl reopen if you think the issue still exists


> problems when shutting down the java consumer .
> -----------------------------------------------
>
>                 Key: KAFKA-888
>                 URL: https://issues.apache.org/jira/browse/KAFKA-888
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.0
>         Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 
> x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2 
>            Reporter: kacper chwialkowski
>            Assignee: Neha Narkhede
>            Priority: Minor
>              Labels: bug, consumer, exception
>
> I got the following error when shutting down the consumer :
> ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0]
>  INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: 
> java.nio.channels.ClosedByInterruptException: null
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>  ~[na:1.7.0_21]
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) 
> ~[na:1.7.0_21]
>       at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) 
> ~[na:1.7.0_21]
>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[na:1.7.0_21]
>       at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[na:1.7.0_21]
>       at kafka.utils.Utils$.read(Utils.scala:394) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> and this is how I create my Consumer 
>       public Boolean call() throws Exception {
>             Map<String, Integer> topicCountMap = new HashMap<>();
>             topicCountMap.put(topic, new Integer(1));
>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
> consumer.createMessageStreams(topicCountMap);
>             KafkaStream<byte[], byte[]> stream = 
> consumerMap.get(topic).get(0);
>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
>             it.next();
>             LOGGER.info("Received the message. Shutting down");
>             consumer.commitOffsets();
>             consumer.shutdown();
>             return true;
>         }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to