[ 
https://issues.apache.org/jira/browse/FLUME-2067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Sargisson updated FLUME-2067:
------------------------------------

    Attachment: flume-embedded-web-flume-2067.tar.gz
    
> Logging from log4j2 FlumeAppender from Jetty webapp to Avro source with full 
> queue raises ClosedByInterruptException or IllegalStateException: Channel 
> closed 
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2067
>                 URL: https://issues.apache.org/jira/browse/FLUME-2067
>             Project: Flume
>          Issue Type: Bug
>          Components: File Channel, Sinks+Sources
>    Affects Versions: v1.3.1
>            Reporter: Edward Sargisson
>         Attachments: flume-embedded-web-flume-2067.tar.gz
>
>
> Attempting to embed a Flume agent in another app does not work very well. I 
> have found a repro of a very simple Jetty app using the log4j2 FlumeAppender 
> to connect to a subsequent Flume agent with a full channel.
> The signature of this problem also occurs when I have a custom version of 
> Flume with log4j2 trying to log its own logs via Flume. In that case, if I 
> have two servers configured to log to each other then neither will start 
> correctly because of issues pushing events to the subsequent agent.
> Note that this is documented against Flume 1.3.1 because that's what the 
> log4j2 Flume Appender uses. Flume 1.4.0 changes the classes sufficiently that 
> it is non-trivial to attempt a repro.
> Note that I include some suppositions at the end of this work item. I haven't 
> been able to properly determine what is causing the failure so you may not 
> want to read them until you've done your own research.
> Steps:
> 1. Setup an additional Flume server (the subsequent server) with an avro 
> source and make the channel fill up.
> 2. Extract the enclosed project. Edit the 
> flume-embedded-hot-deploy/src/main/resource/log4j2.xml and configure the 
> Agent for the FlumeAppender with the details of the subsequent server.
> 3. mvn clean install
> 4. Change to flume-embedded-hot-deploy
> 5. mvn clean package -P debug  (note that you can set it to suspend until a 
> debugger is attached with mvn clean package -P debug,suspend)
> 6. Wait for Jetty to startup - and then for a few seconds.
> Expected results:
> Some complaints about the subsequent server being full but an otherwise happy 
> server.
> Actual results:
> When using the log4j2 Persistent agent (which uses Berkeley DB as a store):
> 2013-06-03 14:01:14,804 INFO  [main] server.AbstractConnector 
> (AbstractConnector.java:265) - Started 
> ServerConnector@75a213c0{HTTP/1.1}{0.0.0.0:8080}
> 2013-06-03 14:01:22,779 DEBUG [Thread-3] ipc.NettyTransceiver 
> (NettyTransceiver.java:314) - Disconnecting from 
> collector1-sal-flex-van.dev-globalrelay.net/10.21.30.20:36892
> 2013-06-03 14:01:22,789 ERROR An exception occurred processing Appender 
> FlumeAppender org.apache.logging.log4j.LoggingException: Exception occurred 
> writing log event
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:176)
>       at 
> org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
>       at 
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
>       at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
>       at 
> org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
>       at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
>       at 
> org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:491)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:230)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:107)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>       at 
> org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
>       at org.jboss.netty.channel.Channels$4.run(Channels.java:389)
>       at 
> org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:41)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processEventQueue(AbstractNioWorker.java:352)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:236)
>       at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>       at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:722)
> Caused by: com.sleepycat.je.ThreadInterruptedException: (JE 5.0.73) 
> Environment must be closed, caused by: 
> com.sleepycat.je.ThreadInterruptedException: Environment invalid because of 
> previous exception: (JE 5.0.73) /var/local/flume/castellan-reader-berkeley-db 
> Channel closed, may be due to thread interrupt THREAD_INTERRUPTED: 
> InterruptedException may cause incorrect internal state, unable to continue. 
> Environment is invalid and must be closed.
>       at 
> com.sleepycat.je.ThreadInterruptedException.wrapSelf(ThreadInterruptedException.java:99)
>       at 
> com.sleepycat.je.dbi.EnvironmentImpl.checkIfInvalid(EnvironmentImpl.java:1512)
>       at com.sleepycat.je.Transaction.checkEnv(Transaction.java:850)
>       at com.sleepycat.je.Transaction.abort(Transaction.java:204)
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:171)
>       ... 26 more
> Caused by: com.sleepycat.je.ThreadInterruptedException: Environment invalid 
> because of previous exception: (JE 5.0.73) 
> /var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
> thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
> internal state, unable to continue. Environment is invalid and must be closed.
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3054)
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.access$500(FileManager.java:2710)
>       at com.sleepycat.je.log.FileManager.syncLogEnd(FileManager.java:2022)
>       at com.sleepycat.je.log.FSyncManager.executeFSync(FSyncManager.java:282)
>       at com.sleepycat.je.log.FSyncManager.fsync(FSyncManager.java:233)
>       at com.sleepycat.je.log.FileManager.groupSync(FileManager.java:2070)
>       at com.sleepycat.je.log.LogManager.multiLog(LogManager.java:403)
>       at com.sleepycat.je.log.LogManager.log(LogManager.java:335)
>       at com.sleepycat.je.txn.Txn.logCommitEntry(Txn.java:957)
>       at com.sleepycat.je.txn.Txn.commit(Txn.java:719)
>       at com.sleepycat.je.txn.Txn.commit(Txn.java:584)
>       at com.sleepycat.je.Transaction.commit(Transaction.java:317)
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:167)
>       ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>       at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:367)
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3043)
>       ... 38 more
> When using log4j2 embedded agent (which is a Flume node running inside the 
> app): 
> 2013-06-03 14:14:28,207 DEBUG Calling createLoggers on class 
> org.apache.logging.log4j.core.config.plugins.LoggersPlugin for element 
> loggers with params(loggers={org.apache.log4j.xml, org.springframework, 
> org.eclipse.jetty, org.elasticsearch, human.com.globalrelay, 
> machine.com.globalrelay, root})
> 2013-06-03 14:14:28,212 DEBUG Reconfiguration completed
> 2013-06-03 14:14:28,244 ERROR An exception occurred processing Appender 
> FlumeAppender java.lang.IllegalStateException: Channel closed 
> [channel=primary]
>       at 
> org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:355)
>       at 
> org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
>       at 
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:260)
>       at 
> org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59)
>       at 
> org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123)
>       at 
> org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
>       at 
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
>       at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
>       at 
> org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
>       at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
>       at 
> org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:195)
>       at 
> org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
>       at 
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106)
>       at 
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
>       at 
> org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
>       at 
> org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
>       at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
>       at org.apache.flume.sink.AvroSink.start(AvroSink.java:242)
>       at 
> org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
>       at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
>       at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:236)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>       at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:722)
> Suppositions:
> I believe this is an issue with the handling of a failed put or take. The 
> failure path interacts with rollback or the Avro client in bad ways.
> For example, for the Berkeley DB case, the Avro client uses a 
> SynchronousQueue to do rendezvous. That queue uses InterruptedException 
> internally. However, the FileChannel uses the NIO FileChannel (and 
> AbstractInterruptibleChannel) which fails if its thread gets interrupted. 
> Some docs:
> http://jira.codehaus.org/browse/JETTY-80
> http://www.oracle.com/technetwork/products/berkeleydb/if-097768.html
> For the embedded Flume case I haven't been able to establish the cause. I 
> think there's a double rollback going on - or possibly the reading thread and 
> the writing thread are being interfered with by other threads. (My initial 
> theory was that Flume logging itself got tied up with singleton type issues 
> but the Jetty repro I just found challenges that).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to