[
https://issues.apache.org/jira/browse/FLUME-2451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gareth Davis updated FLUME-2451:
--------------------------------
Attachment: FLUME-2451.v3.patch
Have reworked the original and v2 patches such that there is a new config
parameter hdfs.closeWritersOnException (defaults to false).
I'm reasonably certain that the code does the job, but test supplied didn't
actually fail in the first place so I'm not really sure quite how much good it
really does.
> HDFS Sink Cannot Reconnect After NameNode Restart
> -------------------------------------------------
>
> Key: FLUME-2451
> URL: https://issues.apache.org/jira/browse/FLUME-2451
> Project: Flume
> Issue Type: Bug
> Components: File Channel, Sinks+Sources
> Affects Versions: v1.4.0
> Environment: 8 node CDH 4.2.2 (2.0.0-cdh4.2.2) cluster
> All cluster machines are running Ubuntu 12.04 x86_64
> Reporter: Andrew O'Neill
> Assignee: Roshan Naik
> Labels: HDFS, Sink
> Attachments: FLUME-2451.patch, FLUME-2451.v2.patch,
> FLUME-2451.v3.patch
>
>
> I am testing a simple flume setup with a Sequence Generator Source, a File
> Channel, and an HDFS Sink (see my flume.conf below). This configuration works
> as expected until I reboot the cluster’s NameNode or until I restart the HDFS
> service on the cluster. At this point, it appears that the Flume Agent cannot
> reconnect to HDFS and must be manually restarted.
> Here is our flume.conf:
> appserver.sources = rawtext
> appserver.channels = testchannel
> appserver.sinks = test_sink
> appserver.sources.rawtext.type = seq
> appserver.sources.rawtext.channels = testchannel
> appserver.channels.testchannel.type = file
> appserver.channels.testchannel.capacity = 10000000
> appserver.channels.testchannel.minimumRequiredSpace = 214748364800
> appserver.channels.testchannel.checkpointDir =
> /Users/aoneill/Desktop/testchannel/checkpoint
> appserver.channels.testchannel.dataDirs =
> /Users/aoneill/Desktop/testchannel/data
> appserver.channels.testchannel.maxFileSize = 20000000
> appserver.sinks.test_sink.type = hdfs
> appserver.sinks.test_sink.channel = testchannel
> appserver.sinks.test_sink.hdfs.path =
> hdfs://cluster01:8020/user/aoneill/flumetest
> appserver.sinks.test_sink.hdfs.closeTries = 3
> appserver.sinks.test_sink.hdfs.filePrefix = events-
> appserver.sinks.test_sink.hdfs.fileSuffix = .avro
> appserver.sinks.test_sink.hdfs.fileType = DataStream
> appserver.sinks.test_sink.hdfs.writeFormat = Text
> appserver.sinks.test_sink.hdfs.inUsePrefix = inuse-
> appserver.sinks.test_sink.hdfs.inUseSuffix = .avro
> appserver.sinks.test_sink.hdfs.rollCount = 100000
> appserver.sinks.test_sink.hdfs.rollInterval = 30
> appserver.sinks.test_sink.hdfs.rollSize = 10485760
> These are the two error message that the Flume Agent outputs constantly after
> the restart:
> 2014-08-26 10:47:24,572 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.isUnderReplicated(AbstractHDFSWriter.java:96)]
> Unexpected error while checking replication factor
> java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.getNumCurrentReplicas(AbstractHDFSWriter.java:162)
> at
> org.apache.flume.sink.hdfs.AbstractHDFSWriter.isUnderReplicated(AbstractHDFSWriter.java:82)
> at
> org.apache.flume.sink.hdfs.BucketWriter.shouldRotate(BucketWriter.java:452)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:387)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:207)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:525)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1253)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:891)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:881)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:982)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:779)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
> and
> 2014-08-26 10:47:29,592 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [WARN -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)]
> HDFS IO error
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:207)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:525)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1253)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:891)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:881)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:982)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:779)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
> I can provide additional information if needed. Thank you very much for any
> insight you are able to provide into this problem.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)