[ 
https://issues.apache.org/jira/browse/FLUME-2840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008630#comment-15008630
 ] 

Gonzalo Herreros commented on FLUME-2840:
-----------------------------------------

Can you provide some context into what you are doing and when does it happens 
(always, sometimes, on certain conditions).
While the code in the Channel could be more robust (instead of making 
assumptions like hasNext() producing an exception or an event, or events being 
presents at rollback), I think the real issue comes from the Kafka consumer.

Also, the title seems to indicate you are doing something strange because the 
zookeeper.jar server library is not included in Flume, which uses zkclient.

> Use Kafka Channel occur exception with zookeeper.jar
> ----------------------------------------------------
>
>                 Key: FLUME-2840
>                 URL: https://issues.apache.org/jira/browse/FLUME-2840
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.6.0
>            Reporter: Qin Shiwei
>
> 17 Nov 2015 12:07:09,256 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:459)  - process failed
> org.apache.flume.ChannelException: Error while getting events from Kafka
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>       at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.NoSuchElementException
>       at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
>       at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
>       ... 6 more
> 17 Nov 2015 12:07:09,257 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> Error while getting events from Kafka
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Error while getting events from 
> Kafka
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>       at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374)
>       ... 3 more
> Caused by: java.util.NoSuchElementException
>       at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
>       at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:310)
>       ... 6 more
> 17 Nov 2015 14:12:08,004 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: value is absent
>       at com.google.common.base.Optional$Absent.get(Optional.java:263)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:458)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> 17 Nov 2015 14:12:08,031 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: value is absent
>       at com.google.common.base.Optional$Absent.get(Optional.java:263)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:458)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> 17 Nov 2015 14:12:08,030 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN - 
> you must either commit or rollback first
>       at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>       at 
> org.apache.flume.sink.baidu.MyRollingFileSink.process(MyRollingFileSink.java:207)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> 17 Nov 2015 14:12:08,028 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: value is absent
>       at com.google.common.base.Optional$Absent.get(Optional.java:263)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:458)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> 17 Nov 2015 14:12:08,027 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN - 
> you must either commit or rollback first
>       at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>       at 
> org.apache.flume.sink.baidu.MyRollingFileSink.process(MyRollingFileSink.java:207)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)
> 17 Nov 2015 14:12:08,026 ERROR 
> [SinkRunner-PollingRunner-DefaultSinkProcessor] 
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver 
> event. Exception follows.
> java.lang.IllegalStateException: value is absent
>       at com.google.common.base.Optional$Absent.get(Optional.java:263)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doRollback(KafkaChannel.java:387)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>       at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:458)
>       at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to