[ 
https://issues.apache.org/jira/browse/FLUME-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuwei updated FLUME-3073:
-------------------------
    Description: 
my agent:kafkaSource-->file channel-->hdfsSink

my.conf

agent.sources = kafkaSource

agent.channels = kafka2HdfsConnectionMon

agent.sinks = hdfsSink


agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon
agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon

#-------- kafkaSource-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092

agent.sources.kafkaSource.topic = userMon

agent.sources.kafkaSource.groupId = flumeConsumer
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min



#------- fileChannel-------------------------

agent.channels.kafka2HdfsConnectionMon.type = file
agent.channels.kafka2HdfsConnectionMon.checkpointDir = 
/data/filechannle_data/kafka2HdfsConnectionMon/checkpoint
agent.channels.kafka2HdfsConnectionMon.dataDirs = 
/data/filechannle_data/kafka2HdfsConnectionMon/data




#---------hdfsSink ------------------
agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream


agent.sinks.hdfsSink.hdfs.rollSize = 134217720
agent.sinks.hdfsSink.hdfs.rollCount = 100000
agent.sinks.hdfsSink.hdfs.rollInterval = 600

agent.sinks.hdfsSink.hdfs.filePrefix=run
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

agent.sinks.hdfsSink.hdfs.inUserPrefix=_
agent.sinks.hdfsSink.hdfs.inUserSuffix=



flume-env.sh:
JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"




When the flume run for a period of time(About a few hours),flume will thow this 
exception.
-----------------------------------------------------------------------
15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
java.lang.IllegalStateException: Correlation id for response (1077833) does not 
match request (1077776)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
java.lang.IllegalStateException: Correlation id for response (1077886) does not 
match request (1077833)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
。。。。。。。。。。
。。。。。。。。。
-----------------------------------------------------------------------


The flume will always produce the exception,Data can be normal to write HDFS。
but it will lead to the cpu used more 100%.



  was:
my agent:kafkaSource-->file channel-->hdfsSink

my.conf
#source
agent.sources = kafkaSource
# channels
agent.channels = kafka2HdfsConnectionMon
# sink
agent.sinks = hdfsSink


agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon
agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon

#-------- kafkaSource-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092
# kafka topic
agent.sources.kafkaSource.topic = userMon
# group.id
agent.sources.kafkaSource.groupId = flumeConsumer
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min



#------- fileChannel-------------------------
# channel
agent.channels.kafka2HdfsConnectionMon.type = file
agent.channels.kafka2HdfsConnectionMon.checkpointDir = 
/data/filechannle_data/kafka2HdfsConnectionMon/checkpoint
agent.channels.kafka2HdfsConnectionMon.dataDirs = 
/data/filechannle_data/kafka2HdfsConnectionMon/data




#---------hdfsSink ------------------
agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream


agent.sinks.hdfsSink.hdfs.rollSize = 134217720
agent.sinks.hdfsSink.hdfs.rollCount = 100000
agent.sinks.hdfsSink.hdfs.rollInterval = 600

agent.sinks.hdfsSink.hdfs.filePrefix=run
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

agent.sinks.hdfsSink.hdfs.inUserPrefix=_
agent.sinks.hdfsSink.hdfs.inUserSuffix=



flume-env.sh:
JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"




When the flume run for a period of time(About a few hours),flume will thow this 
exception.
-----------------------------------------------------------------------
15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
java.lang.IllegalStateException: Correlation id for response (1077833) does not 
match request (1077776)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
(org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
EXCEPTION, {}
java.lang.IllegalStateException: Correlation id for response (1077886) does not 
match request (1077833)
        at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
        at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
        at 
org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
        at 
org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
        at 
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:745)
。。。。。。。。。。
。。。。。。。。。
-----------------------------------------------------------------------


The flume will always produce the exception,Data can be normal to write HDFS。
but it will lead to the cpu used more 100%.




> KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for 
> response (1077833) does not match request (1077776)
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3073
>                 URL: https://issues.apache.org/jira/browse/FLUME-3073
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>            Reporter: xuwei
>
> my agent:kafkaSource-->file channel-->hdfsSink
> my.conf
> agent.sources = kafkaSource
> agent.channels = kafka2HdfsConnectionMon
> agent.sinks = hdfsSink
> agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon
> agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon
> #-------- kafkaSource-----------------
> agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
> agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092
> agent.sources.kafkaSource.topic = userMon
> agent.sources.kafkaSource.groupId = flumeConsumer
> agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min
> #------- fileChannel-------------------------
> agent.channels.kafka2HdfsConnectionMon.type = file
> agent.channels.kafka2HdfsConnectionMon.checkpointDir = 
> /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint
> agent.channels.kafka2HdfsConnectionMon.dataDirs = 
> /data/filechannle_data/kafka2HdfsConnectionMon/data
> #---------hdfsSink ------------------
> agent.sinks.hdfsSink.type = hdfs
> agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d
> agent.sinks.hdfsSink.hdfs.writeFormat = Text
> agent.sinks.hdfsSink.hdfs.fileType = DataStream
> agent.sinks.hdfsSink.hdfs.rollSize = 134217720
> agent.sinks.hdfsSink.hdfs.rollCount = 100000
> agent.sinks.hdfsSink.hdfs.rollInterval = 600
> agent.sinks.hdfsSink.hdfs.filePrefix=run
> agent.sinks.hdfsSink.hdfs.fileSuffix=.data
> agent.sinks.hdfsSink.hdfs.inUserPrefix=_
> agent.sinks.hdfsSink.hdfs.inUserSuffix=
> flume-env.sh:
> JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC 
> -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
> When the flume run for a period of time(About a few hours),flume will thow 
> this exception.
> -----------------------------------------------------------------------
> 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
> (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
> EXCEPTION, {}
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'responses': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>         at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at 
> org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at 
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
> (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
> EXCEPTION, {}
> java.lang.IllegalStateException: Correlation id for response (1077833) does 
> not match request (1077776)
>         at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>         at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at 
> org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at 
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] 
> (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource 
> EXCEPTION, {}
> java.lang.IllegalStateException: Correlation id for response (1077886) does 
> not match request (1077833)
>         at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>         at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at 
> org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at 
> org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 。。。。。。。。。。
> 。。。。。。。。。
> -----------------------------------------------------------------------
> The flume will always produce the exception,Data can be normal to write HDFS。
> but it will lead to the cpu used more 100%.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to