[ 
https://issues.apache.org/jira/browse/FLUME-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933223#comment-14933223
 ] 

Gonzalo Herreros commented on FLUME-2805:
-----------------------------------------

I guess the messages are sent to the topic by another client other than Flume.
The Kafka channel will try to read messages as Flume events serialized as avro. 
To read kafka messages as text you need to add:

a1.channels.channel1.parseAsFlumeEvent=false

> Error while getting events from Kafka java.lang.IndexOutOfBoundsException
> -------------------------------------------------------------------------
>
>                 Key: FLUME-2805
>                 URL: https://issues.apache.org/jira/browse/FLUME-2805
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.6.0
>         Environment: centos 6.5 
> java1.7.0_67
>            Reporter: charliezhang
>              Labels: flume-ng, kafka
>
> I test kafkachannel in following scenario
> * With Flume sink, but no source
> my flume-conf.properities is very siample
> a1.channels = channel1
> a1.sinks = sink1
> a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
> a1.channels.channel1.brokerList=10.142.90.28:9092,10.142.90.29:9092,10.142.90.30:9092
>  
> a1.channels.channel1.topic=test4
> a1.channels.channel1.zookeeperConnect=10.142.90.63:2181,10.142.90.64:2181,10.142.90.65:2181
> a1.channels.channel1.kafka.consumer.timeout.ms = 1000000
> a1.sinks.sink1.type = logger
> a1.sinks.sink1.channel= channel1
> my command:
> ./bin/flume-ng agent -n a1 -c conf -f conf/flume-conf.properties 
> -Dflume.root.logger=INFO,console
> and then throw a exception:
> 2015-09-28 17:24:30,735 
> (flume_j2a1-1443432269768-3f6340d1-leader-finder-thread) [INFO - 
> kafka.utils.Logging$class.info(Logging.scala:68)] 
> [ConsumerFetcherManager-1443432269913] Added fetcher for partitions 
> ArrayBuffer([[test4,0], initOffset 6 to broker 
> id:2,host:cs4n3.ecld.com,port:9092] )
> 2015-09-28 17:24:46,427 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN 
> - 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:332)]
>  Error while getting events from Kafka
> java.lang.IndexOutOfBoundsException
>         at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
>         at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>         at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>         at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2015-09-28 17:24:46,430 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] 
> Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to process transaction
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Error while getting events from 
> Kafka
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
>         ... 3 more
> Caused by: java.lang.IndexOutOfBoundsException
>         at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
>         at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>         at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>         at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
>         ... 6 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to