[ 
https://issues.apache.org/jira/browse/FLUME-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

charliezhang updated FLUME-2805:
--------------------------------
    Labels: flume-ng kafka  (was: fluent kafka)

> Error while getting events from Kafka java.lang.IndexOutOfBoundsException
> -------------------------------------------------------------------------
>
>                 Key: FLUME-2805
>                 URL: https://issues.apache.org/jira/browse/FLUME-2805
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.6.0
>         Environment: centos 6.5 
> java1.7.0_67
>            Reporter: charliezhang
>              Labels: flume-ng, kafka
>
> I test kafkachannel in following scenario
> * With Flume sink, but no source
> my flume-conf.properities is very siample
> a1.channels = channel1
> a1.sinks = sink1
> a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
> a1.channels.channel1.brokerList=10.142.90.28:9092,10.142.90.29:9092,10.142.90.30:9092
>  
> a1.channels.channel1.topic=test4
> a1.channels.channel1.zookeeperConnect=10.142.90.63:2181,10.142.90.64:2181,10.142.90.65:2181
> a1.channels.channel1.kafka.consumer.timeout.ms = 1000000
> a1.sinks.sink1.type = logger
> a1.sinks.sink1.channel= channel1
> my command:
> ./bin/flume-ng agent -n a1 -c conf -f conf/flume-conf.properties 
> -Dflume.root.logger=INFO,console
> and then throw a exception:
> 2015-09-28 17:24:30,735 
> (flume_j2a1-1443432269768-3f6340d1-leader-finder-thread) [INFO - 
> kafka.utils.Logging$class.info(Logging.scala:68)] 
> [ConsumerFetcherManager-1443432269913] Added fetcher for partitions 
> ArrayBuffer([[test4,0], initOffset 6 to broker 
> id:2,host:cs4n3.ecld.com,port:9092] )
> 2015-09-28 17:24:46,427 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN 
> - 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:332)]
>  Error while getting events from Kafka
> java.lang.IndexOutOfBoundsException
>         at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
>         at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>         at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>         at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2015-09-28 17:24:46,430 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] 
> Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to process transaction
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Error while getting events from 
> Kafka
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:333)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at 
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
>         ... 3 more
> Caused by: java.lang.IndexOutOfBoundsException
>         at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
>         at 
> org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
>         at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>         at 
> org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
>         at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:255)
>         at 
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
>         at 
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
>         ... 6 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to