[ https://issues.apache.org/jira/browse/FLUME-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16209098#comment-16209098 ]
raymond zhao commented on FLUME-3073: ------------------------------------- I have the same problem too. I just use kafka source with kafka channel and hdfs sink. one kafka source(3 partitions), three kafka channels(against with source partition), and three sink(against with three channel) ----------------- Flume config -------------------- # vcf kafka agent configuration vcf_kfk_agent.sources = vcf_kfk_source vcf_kfk_agent.sinks = vcf_kfk_sink1 vcf_kfk_sink2 vcf_kfk_sink3 vcf_kfk_agent.channels = vcf_kfk_channel1 vcf_kfk_channel2 vcf_kfk_channel3 # Describe the vcf kafka agent source vcf_kfk_agent.sources.vcf_kfk_source.type = org.apache.flume.source.kafka.KafkaSource vcf_kfk_agent.sources.vcf_kfk_source.channels = vcf_kfk_channel1 vcf_kfk_channel2 vcf_kfk_channel3 vcf_kfk_agent.sources.vcf_kfk_source.batchSize = 5000 vcf_kfk_agent.sources.vcf_kfk_source.batchDurationMillis = 2000 vcf_kfk_agent.sources.vcf_kfk_source.kafka.bootstrap.servers = 10.1.252.113:9092 vcf_kfk_agent.sources.vcf_kfk_source.kafka.topics = vcf_flume_source_topic vcf_kfk_agent.sources.vcf_kfk_source.kafka.consumer.group.id = flume.vcf.source.group # Describe the vcf kafka agent sink vcf_kfk_agent.sinks.vcf_kfk_sink1.type = hdfs vcf_kfk_agent.sinks.vcf_kfk_sink1.channel = vcf_kfk_channel1 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.filePrefix = vcf-stage1 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.fileType = DataStream vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.writeFormat = Text vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.round = true vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollInterval = 3600 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollSize = 128000000 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollCount = 0 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.batchSize = 20 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.roundValue = 1 vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rountUnit = minute vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.useLocalTimeStamp = true vcf_kfk_agent.sinks.vcf_kfk_sink2.type = hdfs vcf_kfk_agent.sinks.vcf_kfk_sink2.channel = vcf_kfk_channel2 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.filePrefix = vcf-stage2 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.fileType = DataStream vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.writeFormat = Text vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.round = true vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollInterval = 3600 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollSize = 128000000 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollCount = 0 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.batchSize = 20 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.roundValue = 1 vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rountUnit = minute vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.useLocalTimeStamp = true vcf_kfk_agent.sinks.vcf_kfk_sink3.type = hdfs vcf_kfk_agent.sinks.vcf_kfk_sink3.channel = vcf_kfk_channel3 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.filePrefix = vcf-stage3 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.fileType = DataStream vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.writeFormat = Text vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.round = true vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollInterval = 3600 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollSize = 128000000 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollCount = 0 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.batchSize = 20 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.roundValue = 1 vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rountUnit = minute vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.useLocalTimeStamp = true # Describe the vcf kafka agent channel vcf_kfk_agent.channels.vcf_kfk_channel1.type = org.apache.flume.channel.kafka.KafkaChannel vcf_kfk_agent.channels.vcf_kfk_channel1.defaultPartitionId = 0 vcf_kfk_agent.channels.vcf_kfk_channel1.parseAsFlumeEvent = false vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.bootstrap.servers = 10.1.252.113:9092 vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.topic = vcf_flume_channel_topic vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.consumer.group.id = flume.vcf.channel.group vcf_kfk_agent.channels.vcf_kfk_channel1.auto.offset.reset = latest vcf_kfk_agent.channels.vcf_kfk_channel2.type = org.apache.flume.channel.kafka.KafkaChannel vcf_kfk_agent.channels.vcf_kfk_channel2.defaultPartitionId = 1 vcf_kfk_agent.channels.vcf_kfk_channel2.parseAsFlumeEvent = false vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.bootstrap.servers = 10.1.252.113:9092 vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.topic = vcf_flume_channel_topic vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.consumer.group.id = flume.vcf.channel.group vcf_kfk_agent.channels.vcf_kfk_channel2.auto.offset.reset = latest vcf_kfk_agent.channels.vcf_kfk_channel3.type = org.apache.flume.channel.kafka.KafkaChannel vcf_kfk_agent.channels.vcf_kfk_channel3.defaultPartitionId = 2 vcf_kfk_agent.channels.vcf_kfk_channel3.parseAsFlumeEvent = false vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.bootstrap.servers = 10.1.252.113:9092 vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.topic = vcf_flume_channel_topic vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.consumer.group.id = flume.vcf.channel.group vcf_kfk_agent.channels.vcf_kfk_channel3.auto.offset.reset = latest ----------------------------------------------------------- -------------------- Exception info ----------------- 18 Oct 2017 17:26:35,727 ERROR [PollableSourceRunner-KafkaSource-vcf_kfk_source] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'generation_id': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:200) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) 18 Oct 2017 17:26:40,728 ERROR [PollableSourceRunner-KafkaSource-vcf_kfk_source] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'generation_id': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:200) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:745) ---------------------------------------------------------- Flume version: 1.7.0 Kafka version: 0.9.0 JDK version: 1.8 > KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for > response (1077833) does not match request (1077776) > ----------------------------------------------------------------------------------------------------------------------------- > > Key: FLUME-3073 > URL: https://issues.apache.org/jira/browse/FLUME-3073 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: 1.7.0 > Reporter: xuwei > > my agent:kafkaSource-->file channel-->hdfsSink > my.conf > agent.sources = kafkaSource > agent.channels = kafka2HdfsConnectionMon > agent.sinks = hdfsSink > agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon > agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon > #---------kafkasource ------------------ > agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource > agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092 > agent.sources.kafkaSource.topic = userMon > agent.sources.kafkaSource.groupId = flumeConsumer > agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 > #---------filechannel ------------------ > agent.channels.kafka2HdfsConnectionMon.type = file > agent.channels.kafka2HdfsConnectionMon.checkpointDir = > /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint > agent.channels.kafka2HdfsConnectionMon.dataDirs = > /data/filechannle_data/kafka2HdfsConnectionMon/data > #---------hdfsSink ------------------ > agent.sinks.hdfsSink.type = hdfs > agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d > agent.sinks.hdfsSink.hdfs.writeFormat = Text > agent.sinks.hdfsSink.hdfs.fileType = DataStream > agent.sinks.hdfsSink.hdfs.rollSize = 134217720 > agent.sinks.hdfsSink.hdfs.rollCount = 100000 > agent.sinks.hdfsSink.hdfs.rollInterval = 600 > agent.sinks.hdfsSink.hdfs.filePrefix=run > agent.sinks.hdfsSink.hdfs.fileSuffix=.data > agent.sinks.hdfsSink.hdfs.inUserPrefix=_ > agent.sinks.hdfsSink.hdfs.inUserSuffix= > flume-env.sh: > JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC > -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" > When the flume run for a period of time(About a few hours),flume will thow > this exception. > ----------------------------------------------------------------------- > 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'responses': java.nio.BufferUnderflowException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > java.lang.IllegalStateException: Correlation id for response (1077833) does > not match request (1077776) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] > (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource > EXCEPTION, {} > java.lang.IllegalStateException: Correlation id for response (1077886) does > not match request (1077833) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > at > org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304) > at > org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) > at > org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) > at java.lang.Thread.run(Thread.java:745) > 。。。。。。。。。。 > 。。。。。。。。。 > ----------------------------------------------------------------------- > The flume will always produce the exception,Data can be normal to write > HDFS,but these data are repeated。 > I think there is no correct offset。 -- This message was sent by Atlassian JIRA (v6.4.14#64029)