Hi, Yeah I think this is:
https://issues.apache.org/jira/browse/FLUME-1779 Brock On Mon, Jan 14, 2013 at 4:28 PM, Connor Woodson <[email protected]> wrote: > Forwarding from the user list. The bug here is that the HDFSEventSink will > not work in a FailoverSinkProcessor. What is going on is that when there is > an IOException, the HDFSEventSink will return Status.BACKOFF; however, in > the failover processor, a sink is only failed if it throws an exception. So > maybe the next process call to the HDFSEventSink will write correctly; > however, if it can't, it will never roll over. The solution I proposed > (throwing an exception) isn't exactly the most elegant, but I can't think > of a better way to go about it. > > - Connor > > ---------- Forwarded message ---------- > From: Connor Woodson <[email protected]> > Date: Mon, Jan 14, 2013 at 4:25 PM > Subject: Re: HDFSsink failover error > To: "[email protected]" <[email protected]>, Rahul Ravindran < > [email protected]> > > > Oh alright, found it. What is happening is that the HDFS sink does not > throw an exception for this write error, but instead returns a > Status.BACKOFF, and as such the failover processor doesn't think this sink > failed. > > (What is strange is that the processor deals with the backoff message for > failed sinks, but not active sinks). > > So until that's fixed there isn't a clean way to fix this. The best > solution I can offer is to get the source code for Flume (either the latest > one or the 1.3.1 tag), and make the following change: > > In the process method of the HDFSEventSink, in the catch-statements, change: > > LOG.warn("HDFS IO error", eIO); > return Status.BACKOFF; > > > to: > > LOG.warn("HDFS IO error", eIO); > throw eIO; > > > (Line 457 in 1.3.1 or line 454 in trunk) > > What this will end up doing is if you ever use the sink outside of a > failover processor, when there is a write error then the sink will throw an > exception and it will probably stop - so, this change will only make it > able to work within the failover sink processor. Optionally, you could make > a copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and > put that change in it, so that you can have both versions of the sink. > > (if you want instructions on compiling Flume after this change, look for > the thread 'custome serializer') > > Unfortunate issue, but it will be fixed in 1.4 I'm sure. > > - Connor > > On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <[email protected]> wrote: > >> Here is the entire log file after I restart flume >> >> ------------------------------ >> *From:* Connor Woodson <[email protected]> >> *To:* "[email protected]" <[email protected]>; Rahul Ravindran < >> [email protected]> >> *Sent:* Monday, January 14, 2013 3:51 PM >> >> *Subject:* Re: HDFSsink failover error >> >> Can you look at the full log file and post the above section as well as >> 5-10 lines above/below it (you don't have to post that stack trace if you >> don't want)? Because that error, while it should definitely be logged, >> should be followed by some error lines giving context as to what is going >> on. And if that is the end of the log file then...well, that just shouldn't >> happen, as there are several different places that would have produced log >> messages as that exception propagates >> >> - Connor >> >> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <[email protected]>wrote: >> >> The writes to the backup were successful when I attempted to write to it >> directly but not via the failover sink processor. I did not see the warning >> that you mentioned about "Sink hdfs-sink1failed". >> >> The full log trace is below: >> >> 14 Jan 2013 22:48:24,727 INFO [hdfs-hdfs-sink2-call-runner-1] >> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208) - Creating >> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp >> 14 Jan 2013 22:48:24,739 WARN >> [SinkRunner-PollingRunner-FailoverSinkProcessor] >> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456) - HDFS IO error >> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >> Operation category WRITE is not supported in state standby >> at >> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409) >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205) >> at >> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068) >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687) >> >> at org.apache.hadoop.ipc.Client.call(Client.java:1160) >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) >> at $Proxy11.create(Unknown Source) >> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:616) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) >> at $Proxy11.create(Unknown Source) >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192) >> at >> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298) >> at >> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317) >> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215) >> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173) >> at >> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272) >> at >> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261) >> at >> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685) >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674) >> at >> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60) >> at >> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209) >> at >> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170) >> at >> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) >> at >> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170) >> at >> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) >> at >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) >> at java.util.concurrent.FutureTask.run(FutureTask.java:166) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> at java.lang.Thread.run(Thread.java:679) >> >> ------------------------------ >> *From:* Connor Woodson <[email protected]> >> *To:* Rahul Ravindran <[email protected]> >> *Cc:* "[email protected]" <[email protected]> >> *Sent:* Monday, January 14, 2013 3:05 PM >> >> *Subject:* Re: HDFSsink failover error >> >> So you are able to write normally to the back-up HDFS servers? And that >> error you got was when you were trying to write to the normal server? Was >> it supposed to be an error (it looks like it's due to how your Hadoop is >> set up)? >> >> The log lines you pasted make it look like there was a problem with your >> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong); >> what should have happened is that the event was then written to the backup >> server. Below the stack trace there should probably have been another WARN >> statement saying "Sink hdfs-sink1 failed and has been sent to the failover >> list". And if hdfs-sink1-back then was unable to write, you would see a >> thrown EventDeliveryException in your log. >> >> If there isn't anything else in the log, and the event wasn't written to >> the backup server, then that would be a bug. >> >> - Connor >> >> >> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <[email protected]>wrote: >> >> Here is the full config. I swapped the priorities on the sink processor >> after performing the namenode failiver and the writes were successful to >> the newly active namenode. >> >> agent1.channels.ch1.type = FILE >> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint >> agent1.channels.ch1.dataDirs = /flume_runtime/data >> >> >> agent1.channels.ch2.type = FILE >> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2 >> agent1.channels.ch2.dataDirs = /flume_runtime/data2 >> >> >> >> # Define an Avro source called avro-source1 on agent1 and tell it >> >> # to bind to 0.0.0.0:41414. Connect it to channel ch1. >> >> agent1.sources.avro-source1.channels = ch1 >> agent1.sources.avro-source1.type = avro >> agent1.sources.avro-source1.bind = 0.0.0.0 >> agent1.sources.avro-source1.port = 4545 >> >> >> >> agent1.sources.avro-source2.channels = ch2 >> agent1.sources.avro-source2.type = avro >> agent1.sources.avro-source2.bind = 0.0.0.0 >> agent1.sources.avro-source2.port = 4546 >> >> >> agent1.sinks.hdfs-sink1.channel = ch1 >> agent1.sinks.hdfs-sink1.type = hdfs >> agent1.sinks.hdfs-sink1.hdfs.path = >> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/ >> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000 >> >> agent1.sinks.hdfs-sink2.channel = ch2 >> agent1.sinks.hdfs-sink2.type = hdfs >> agent1.sinks.hdfs-sink2.hdfs.path = >> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/ >> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000 >> >> >> agent1.sinks.hdfs-sink1-back.channel = ch1 >> agent1.sinks.hdfs-sink1-back.type = hdfs >> agent1.sinks.hdfs-sink1-back.hdfs.path = >> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/ >> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000 >> >> agent1.sinks.hdfs-sink2-back.channel = ch2 >> agent1.sinks.hdfs-sink2-back.type = hdfs >> agent1.sinks.hdfs-sink2-back.hdfs.path = >> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/ >> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000 >> >> >> >> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back >> agent1.sinkgroups.failoverGroup1.processor.type = failover >> #higher number in priority is higher priority >> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10 >> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5 >> #failover if failure detected for 10 seconds >> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000 >> >> >> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back >> agent1.sinkgroups.failoverGroup2.processor.type = failover >> #higher number in priority is higher priority >> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10 >> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5 >> #failover if failure detected for 10 seconds >> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000 >> >> # Finally, now that we've defined all of our components, tell >> # agent1 which ones we want to activate. >> agent1.sinkgroups = failoverGroup1 failoverGroup2 >> agent1.channels = ch1 ch2 >> agent1.sources = avro-source1 avro-source2 >> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back >> >> ------------------------------ >> *From:* Connor Woodson <[email protected]> >> *To:* [email protected]; Rahul Ravindran <[email protected]> >> *Sent:* Monday, January 14, 2013 2:28 PM >> *Subject:* Re: HDFSsink failover error >> >> I assume that's only part of your config as it's missing a source; if you >> get rid of the sink processor, can you write to each hdfs sink >> individually? (comment one out at a time) >> >> - Connor >> >> >> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <[email protected]>wrote: >> >> Hi, >> I am attempting to setup an HDFS sink such that when a namenode >> failover occurs (active namenode is brought down and the standby namenode >> switches to active), the failover sink would send events to the new active >> namenode. I see an error about WRITE not supported in standby state..Does >> this not count as a failure for the failover sink? >> Thanks, >> ~Rahul. >> >> My config is as follows: >> >> agent1.sinks.hdfs-sink1.channel = ch1 >> agent1.sinks.hdfs-sink1.type = hdfs >> agent1.sinks.hdfs-sink1.hdfs.path = >> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/ >> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000 >> >> agent1.sinks.hdfs-sink1-back.channel = ch1 >> agent1.sinks.hdfs-sink1-back.type = hdfs >> agent1.sinks.hdfs-sink1-back.hdfs.path = >> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/ >> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event >> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text >> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120 >> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0 >> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0 >> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream >> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000 >> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000 >> >> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back >> agent1.sinkgroups.failoverGroup1.processor.type = failover >> #higher number in priority is higher priority >> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10 >> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5 >> #failover if failure detected for 10 seconds >> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000 >> >> >> agent1.sinkgroups = failoverGroup1 >> >> 14 Jan 2013 21:37:28,819 INFO [hdfs-hdfs-sink2-call-runner-6] >> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208) - Creating >> hdfs://ip-10-4-71-187.ec2.internal/.... >> 14 Jan 2013 21:37:28,834 WARN >> [SinkRunner-PollingRunner-FailoverSinkProcessor] >> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456) - HDFS IO error >> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >> Operation category WRITE is not supported in state standby >> at >> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688) >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409) >> at >> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205) >> at >> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068) >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687) >> >> at org.apache.hadoop.ipc.Client.call(Client.java:1160) >> at >> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) >> at $Proxy11.create(Unknown Source) >> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:616) >> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho >> >> >> >> >> >> >> >> >> >> >> -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
