We are using flume to copy the same log data to s3 and hdfs. After we upgraded to 0.9.4 about a week ago, we started running into an issue where flume occasionally stops sending data to one or the other. Any suggestions as to what might be going on?
version: flume 0.9.4 setup: ObservationNode-1 TomcatDatalogFlow tailDir("<logdir>","<logfile_names>",true) autoBEChain ObservationConsumer-1 TomcatDatalogFlow autoCollectorSource collectorSink("s3n://<key>:<secret>@<bucket>/logs/dataSource=%{tailSrcFile}/date=%Y%m%d/","%{tailSrcFile}","300000") HDFSObservationNode-1 TomcatHDFSFlow tailDir("<logdir>","<logfile_names>",true) autoBEChain HDFSObservationConsumer-1 TomcatHDFSFlow autoCollectorSource collectorSink("hdfs://<hdfs_node>:9000/<dirname>/logs/dataSource=%{tailSrcFile}/date=%Y%m%d/", "%{tailSrcFile}", "30000") The observation nodes are running on one physical node, the consumers are running on another physical node. thread dump from the collector: 2011-08-16 15:48:10 Full thread dump Java HotSpot(TM) Client VM (19.1-b02 mixed mode, sharing): "Thread-2" daemon prio=10 tid=0x09240000 nid=0x23b7 runnable [0xb412d000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:177) at com.cloudera.util.InputStreamPipe$CopyThread.run(InputStreamPipe.java:100) "Thread-1" daemon prio=10 tid=0x0923f000 nid=0x23b6 runnable [0xb417e000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:199) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read1(BufferedInputStream.java:258) at java.io.BufferedInputStream.read(BufferedInputStream.java:317) - locked <0x7f3821c0> (a java.io.BufferedInputStream) at java.io.FilterInputStream.read(FilterInputStream.java:90) at com.cloudera.util.InputStreamPipe$CopyThread.run(InputStreamPipe.java:100) "process reaper" daemon prio=10 tid=0x091a9400 nid=0x23b4 runnable [0xb41cf000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.access$900(UNIXProcess.java:20) at java.lang.UNIXProcess$1$1.run(UNIXProcess.java:132) "Low Memory Detector" daemon prio=10 tid=0x08e05400 nid=0x23b2 runnable [0x00000000] java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x08e03800 nid=0x23b1 waiting on condition [0x00000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x08e01c00 nid=0x23b0 waiting on condition [0x00000000] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x08dfdc00 nid=0x23af in Object.wait() [0xb487e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x7f383480> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x7f383480> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) "Reference Handler" daemon prio=10 tid=0x08df9400 nid=0x23ae in Object.wait() [0xb48cf000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x7f380110> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x7f380110> (a java.lang.ref.Reference$Lock) "main" prio=10 tid=0x08dcb400 nid=0x23ac runnable [0xb6dae000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked <0x7f383588> (a sun.nio.ch.Util$2) - locked <0x7f383598> (a java.util.Collections$UnmodifiableSet) - locked <0x7f383548> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at com.cloudera.flume.watchdog.Watchdog.launchAgent(Watchdog.java:182) at com.cloudera.flume.watchdog.Watchdog.run(Watchdog.java:267) at com.cloudera.flume.watchdog.FlumeWatchdog.main(FlumeWatchdog.java:77) "VM Thread" prio=10 tid=0x08deec00 nid=0x23ad runnable "VM Periodic Task Thread" prio=10 tid=0x08e11400 nid=0x23b3 waiting on condition JNI global references: 1221 2011-08-16 15:48:10 Full thread dump Java HotSpot(TM) Client VM (19.1-b02 mixed mode, sharing): "MultiThHeap def new generation total 4928K, used 3763K [0x7eeb0000, 0x7f400000, 0x84400000) eden space 4416K, 73% used [0x7eeb0000, 0x7f1dce08, 0x7f300000) from space 512K, 100% used [0x7f380000, 0x7f400000, 0x7f400000) to space 512K, 0% used [0x7f300000, 0x7f300000, 0x7f380000) tenured generation total 10944K, used 50K [0x84400000, 0x84eb0000, 0x8eeb0000) the space 10944K, 0% used [0x84400000, 0x8440ca70, 0x8440cc00, 0x84eb0000) compacting perm gen total 12288K, used 1657K [0x8eeb0000, 0x8fab0000, 0x92eb0000) the space 12288K, 13% used [0x8eeb0000, 0x8f04e7c8, 0x8f04e800, 0x8fab0000) ro space 10240K, 61% used [0x92eb0000, 0x934d8a38, 0x934d8c00, 0x938b0000) rw space 12288K, 60% used [0x938b0000, 0x93fe8ec0, 0x93fe9000, 0x944b0000) readedHttpConnectionManager cleanup" daemon prio=10 tid=0x08452800 nid=0x2900 in Object.wait() [0xb38e1000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x8455a0e0> (a java.lang.ref.ReferenceQueue$Lock) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ReferenceQueueThread.run(MultiThreadedHttpConnectionManager.java:1082) "pool-3-thread-1" prio=10 tid=0x08435400 nid=0x28fd waiting on condition [0xb39d4000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x84541250> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:662) "pool-1-thread-1" prio=10 tid=0x088d0800 nid=0x28fc runnable [0xb3a76000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read1(BufferedInputStream.java:258) at java.io.BufferedInputStream.read(BufferedInputStream.java:317) - locked <0x84551078> (a java.io.BufferedInputStream) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) at com.cloudera.flume.handlers.thrift.TStatsTransport.read(TStatsTransport.java:59) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) at com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor.process(ThriftFlumeEventServer.java:224) at org.apache.thrift.server.TSaneThreadPoolServer$WorkerProcess.run(TSaneThreadPoolServer.java:280) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) "LeaseChecker" daemon prio=10 tid=0x08431000 nid=0x2888 waiting on condition [0xb3a25000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.run(DFSClient.java:1167) at java.lang.Thread.run(Thread.java:662) "pool-4-thread-1" prio=10 tid=0x08409800 nid=0x2884 waiting on condition [0xb3b69000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x8452c830> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:662) "Roll-TriggerThread-1" prio=10 tid=0x0866c400 nid=0x2882 waiting on condition [0xb3c0b000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62) at com.cloudera.util.Clock.sleep(Clock.java:88) at com.cloudera.flume.handlers.rolling.RollSink$TriggerThread.run(RollSink.java:148) "Roll-TriggerThread-0" prio=10 tid=0x08450000 nid=0x2880 waiting on condition [0xb3cad000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62) at com.cloudera.util.Clock.sleep(Clock.java:88) at com.cloudera.flume.handlers.rolling.RollSink$TriggerThread.run(RollSink.java:148) "Thrift server:class org.apache.thrift.TProcessorFactory on class org.apache.thrift.transport.TSaneServerSocket" prio=10 tid=0x0844cc00 nid=0x287f runnable [0xb3cfe000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408) - locked <0x8452ede0> (a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:462) at java.net.ServerSocket.accept(ServerSocket.java:430) at org.apache.thrift.transport.TSaneServerSocket.acceptImpl(TSaneServerSocket.java:134) at org.apache.thrift.transport.TServerTransport.accept(TServerTransport.java:31) at org.apache.thrift.server.TSaneThreadPoolServer$1.run(TSaneThreadPoolServer.java:175) "logicalNode HDFSObservationConsumer-1-23" prio=10 tid=0x0844d800 nid=0x287e waiting on condition [0xb3eba000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.cloudera.flume.handlers.thrift.ThriftEventSource.close(ThriftEventSource.java:191) - locked <0x8452ef60> (a com.cloudera.flume.handlers.thrift.ThriftEventSource) at com.cloudera.flume.core.connector.DirectDriver$PumperThread.ensureClosed(DirectDriver.java:142) at com.cloudera.flume.core.connector.DirectDriver$PumperThread.errorCleanup(DirectDriver.java:163) at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:116) "logicalNode ObservationConsumer-1-22" prio=10 tid=0x0844c000 nid=0x287d waiting on condition [0xb3e69000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x8453d490> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) at com.cloudera.flume.handlers.thrift.ThriftEventSource.next(ThriftEventSource.java:209) at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:105) "DestroyJavaVM" prio=10 tid=0x083e7000 nid=0x23b8 waiting on condition [0x00000000] java.lang.Thread.State: RUNNABLE "ChokeManager" daemon prio=10 tid=0x088aa000 nid=0x23d1 waiting on condition [0xb3f0c000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.cloudera.flume.handlers.debug.ChokeManager.run(ChokeManager.java:143) "Heartbeat" prio=10 tid=0x088b3400 nid=0x23d0 waiting on condition [0xb3f5c000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62) at com.cloudera.util.Clock.sleep(Clock.java:88) at com.cloudera.flume.agent.LivenessManager$HeartbeatThread.run(LivenessManager.java:234) "Check config" prio=10 tid=0x088b1c00 nid=0x23cf waiting on condition [0xb3fad000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x84490da8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at com.cloudera.flume.agent.LivenessManager.dequeueCheckConfig(LivenessManager.java:70) at com.cloudera.flume.agent.LivenessManager$CheckConfigThread.run(LivenessManager.java:205) "Thread-1" prio=10 tid=0x08899400 nid=0x23ce waiting for monitor entry [0xb3ffe000] java.lang.Thread.State: BLOCKED (on object monitor) at com.cloudera.flume.handlers.thrift.ThriftEventSource.getMetrics(ThriftEventSource.java:93) - waiting to lock <0x8452ef60> (a com.cloudera.flume.handlers.thrift.ThriftEventSource) at com.cloudera.flume.core.EventSource$Base.getReports(EventSource.java:186) at com.cloudera.flume.agent.LogicalNode.getReports(LogicalNode.java:309) - locked <0x8451f8d8> (a com.cloudera.flume.agent.LogicalNode) at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.querySrcSinkReports(MasterReportPusher.java:102) at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.sendReports(MasterReportPusher.java:110) at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.run(MasterReportPusher.java:119) "Timer-0" daemon prio=10 tid=0x088b0800 nid=0x23cd in Object.wait() [0xb415c000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0x844d02e0> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "16496587@qtp-2859291-1 - Acceptor0 SelectChannelConnector@0.0.0.0:35862" prio=10 tid=0x08880800 nid=0x23cc runnable [0xb41ad000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked <0x844d0398> (a sun.nio.ch.Util$2) - locked <0x844d03a8> (a java.util.Collections$UnmodifiableSet) - locked <0x844d0358> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:498) at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192) at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124) at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708) at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582) "23047631@qtp-2859291-0" prio=10 tid=0x08876000 nid=0x23cb in Object.wait() [0xb41fe000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:626) - locked <0x844a6f28> (a org.mortbay.thread.QueuedThreadPool$PoolThread) "Low Memory Detector" daemon prio=10 tid=0x08421000 nid=0x23c5 runnable [0x00000000] java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x0841f400 nid=0x23c4 waiting on condition [0x00000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x0841dc00 nid=0x23c3 waiting on condition [0x00000000] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x08416800 nid=0x23bb in Object.wait() [0xb48e3000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x84402268> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) "Reference Handler" daemon prio=10 tid=0x08415000 nid=0x23ba in Object.wait() [0xb4934000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x844022f8> (a java.lang.ref.Reference$Lock) "VM Thread" prio=10 tid=0x0840a800 nid=0x23b9 runnable "VM Periodic Task Thread" prio=10 tid=0x0842d000 nid=0x23c6 waiting on condition JNI global references: 1619 Heap def new generation total 4928K, used 4244K [0x7eeb0000, 0x7f400000, 0x84400000) eden space 4416K, 93% used [0x7eeb0000, 0x7f2bcd28, 0x7f300000) from space 512K, 19% used [0x7f300000, 0x7f318650, 0x7f380000) to space 512K, 0% used [0x7f380000, 0x7f380000, 0x7f400000) tenured generation total 10944K, used 3874K [0x84400000, 0x84eb0000, 0x8eeb0000) the space 10944K, 35% used [0x84400000, 0x847c8878, 0x847c8a00, 0x84eb0000) compacting perm gen total 12288K, used 11098K [0x8eeb0000, 0x8fab0000, 0x92eb0000) the space 12288K, 90% used [0x8eeb0000, 0x8f986860, 0x8f986a00, 0x8fab0000) ro space 10240K, 61% used [0x92eb0000, 0x934d8a38, 0x934d8c00, 0x938b0000) rw space 12288K, 60% used [0x938b0000, 0x93fe8ec0, 0x93fe9000, 0x944b0000)