[ https://issues.apache.org/jira/browse/FLUME-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xuwei updated FLUME-3073: ------------------------- Description: my agent:kafkaSource-->file channel-->hdfsSink my.conf agent.sources = kafkaSource agent.channels = kafka2HdfsConnectionMon agent.sinks = hdfsSink agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon #-------- kafkaSource----------------- agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092 agent.sources.kafkaSource.topic = userMon agent.sources.kafkaSource.groupId = flumeConsumer agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min #------- fileChannel------------------------- agent.channels.kafka2HdfsConnectionMon.type = file agent.channels.kafka2HdfsConnectionMon.checkpointDir = /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint agent.channels.kafka2HdfsConnectionMon.dataDirs = /data/filechannle_data/kafka2HdfsConnectionMon/data #---------hdfsSink ------------------ agent.sinks.hdfsSink.type = hdfs agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.rollSize = 134217720 agent.sinks.hdfsSink.hdfs.rollCount = 100000 agent.sinks.hdfsSink.hdfs.rollInterval = 600 agent.sinks.hdfsSink.hdfs.filePrefix=run agent.sinks.hdfsSink.hdfs.fileSuffix=.data agent.sinks.hdfsSink.hdfs.inUserPrefix=_ agent.sinks.hdfsSink.hdfs.inUserSuffix= flume-env.sh: JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" When the flume run for a period of time(About a few hours),flume will thow this exception. ----------------------------------------------------------------------- 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} java.lang.IllegalStateException: Correlation id for response (1077886) does not match request (1077833) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 。。。。。。。。。。 。。。。。。。。。 ----------------------------------------------------------------------- The flume will always produce the exception,Data can be normal to write HDFS。 but it will lead to the cpu used more 100%. was: my agent:kafkaSource-->file channel-->hdfsSink my.conf #source agent.sources = kafkaSource # channels agent.channels = kafka2HdfsConnectionMon # sink agent.sinks = hdfsSink agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon #-------- kafkaSource----------------- agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092 # kafka topic agent.sources.kafkaSource.topic = userMon # group.id agent.sources.kafkaSource.groupId = flumeConsumer agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min #------- fileChannel------------------------- # channel agent.channels.kafka2HdfsConnectionMon.type = file agent.channels.kafka2HdfsConnectionMon.checkpointDir = /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint agent.channels.kafka2HdfsConnectionMon.dataDirs = /data/filechannle_data/kafka2HdfsConnectionMon/data #---------hdfsSink ------------------ agent.sinks.hdfsSink.type = hdfs agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.rollSize = 134217720 agent.sinks.hdfsSink.hdfs.rollCount = 100000 agent.sinks.hdfsSink.hdfs.rollInterval = 600 agent.sinks.hdfsSink.hdfs.filePrefix=run agent.sinks.hdfsSink.hdfs.fileSuffix=.data agent.sinks.hdfsSink.hdfs.inUserPrefix=_ agent.sinks.hdfsSink.hdfs.inUserSuffix= flume-env.sh: JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" When the flume run for a period of time(About a few hours),flume will thow this exception. ----------------------------------------------------------------------- 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} java.lang.IllegalStateException: Correlation id for response (1077886) does not match request (1077833) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 。。。。。。。。。。 。。。。。。。。。 ----------------------------------------------------------------------- The flume will always produce the exception,Data can be normal to write HDFS。 but it will lead to the cpu used more 100%. > KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for > response (1077833) does not match request (1077776) > ----------------------------------------------------------------------------------------------------------------------------- > > Key: FLUME-3073 > URL: https://issues.apache.org/jira/browse/FLUME-3073 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.7.0 > Reporter: xuwei > > my agent:kafkaSource-->file channel-->hdfsSink > my.conf > agent.sources = kafkaSource > agent.channels = kafka2HdfsConnectionMon > agent.sinks = hdfsSink > agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon > agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon > #-------- kafkaSource----------------- > agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource > agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092 > agent.sources.kafkaSource.topic = userMon > agent.sources.kafkaSource.groupId = flumeConsumer > agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 #5min > #------- fileChannel------------------------- > agent.channels.kafka2HdfsConnectionMon.type = file > agent.channels.kafka2HdfsConnectionMon.checkpointDir = > /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint > agent.channels.kafka2HdfsConnectionMon.dataDirs = > /data/filechannle_data/kafka2HdfsConnectionMon/data > #---------hdfsSink ------------------ > agent.sinks.hdfsSink.type = hdfs > agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d > agent.sinks.hdfsSink.hdfs.writeFormat = Text > agent.sinks.hdfsSink.hdfs.fileType = DataStream > agent.sinks.hdfsSink.hdfs.rollSize = 134217720 > agent.sinks.hdfsSink.hdfs.rollCount = 100000 > agent.sinks.hdfsSink.hdfs.rollInterval = 600 > agent.sinks.hdfsSink.hdfs.filePrefix=run > agent.sinks.hdfsSink.hdfs.fileSuffix=.data > agent.sinks.hdfsSink.hdfs.inUserPrefix=_ > agent.sinks.hdfsSink.hdfs.inUserSuffix= > flume-env.sh: > JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC > -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" > When the flume run for a period of time(About a few hours),flume will thow > this exception. > ----------------------------------------------------------------------- > 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'responses': java.nio.BufferUnderflowException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > java.lang.IllegalStateException: Correlation id for response (1077833) does > not match request (1077776) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > java.lang.IllegalStateException: Correlation id for response (1077886) does > not match request (1077833) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 。。。。。。。。。。 > 。。。。。。。。。 > ----------------------------------------------------------------------- > The flume will always produce the exception,Data can be normal to write HDFS。 > but it will lead to the cpu used more 100%. -- This message was sent by Atlassian JIRA (v6.3.15#6346)