[
https://issues.apache.org/jira/browse/FLUME-2067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Edward Sargisson updated FLUME-2067:
------------------------------------
Attachment: flume-embedded-web-flume-2067.tar.gz
> Logging from log4j2 FlumeAppender from Jetty webapp to Avro source with full
> queue raises ClosedByInterruptException or IllegalStateException: Channel
> closed
> --------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2067
> URL: https://issues.apache.org/jira/browse/FLUME-2067
> Project: Flume
> Issue Type: Bug
> Components: File Channel, Sinks+Sources
> Affects Versions: v1.3.1
> Reporter: Edward Sargisson
> Attachments: flume-embedded-web-flume-2067.tar.gz
>
>
> Attempting to embed a Flume agent in another app does not work very well. I
> have found a repro of a very simple Jetty app using the log4j2 FlumeAppender
> to connect to a subsequent Flume agent with a full channel.
> The signature of this problem also occurs when I have a custom version of
> Flume with log4j2 trying to log its own logs via Flume. In that case, if I
> have two servers configured to log to each other then neither will start
> correctly because of issues pushing events to the subsequent agent.
> Note that this is documented against Flume 1.3.1 because that's what the
> log4j2 Flume Appender uses. Flume 1.4.0 changes the classes sufficiently that
> it is non-trivial to attempt a repro.
> Note that I include some suppositions at the end of this work item. I haven't
> been able to properly determine what is causing the failure so you may not
> want to read them until you've done your own research.
> Steps:
> 1. Setup an additional Flume server (the subsequent server) with an avro
> source and make the channel fill up.
> 2. Extract the enclosed project. Edit the
> flume-embedded-hot-deploy/src/main/resource/log4j2.xml and configure the
> Agent for the FlumeAppender with the details of the subsequent server.
> 3. mvn clean install
> 4. Change to flume-embedded-hot-deploy
> 5. mvn clean package -P debug (note that you can set it to suspend until a
> debugger is attached with mvn clean package -P debug,suspend)
> 6. Wait for Jetty to startup - and then for a few seconds.
> Expected results:
> Some complaints about the subsequent server being full but an otherwise happy
> server.
> Actual results:
> When using the log4j2 Persistent agent (which uses Berkeley DB as a store):
> 2013-06-03 14:01:14,804 INFO [main] server.AbstractConnector
> (AbstractConnector.java:265) - Started
> ServerConnector@75a213c0{HTTP/1.1}{0.0.0.0:8080}
> 2013-06-03 14:01:22,779 DEBUG [Thread-3] ipc.NettyTransceiver
> (NettyTransceiver.java:314) - Disconnecting from
> collector1-sal-flex-van.dev-globalrelay.net/10.21.30.20:36892
> 2013-06-03 14:01:22,789 ERROR An exception occurred processing Appender
> FlumeAppender org.apache.logging.log4j.LoggingException: Exception occurred
> writing log event
> at
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:176)
> at
> org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
> at
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
> at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
> at
> org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
> at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
> at
> org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:491)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:230)
> at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:107)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> at
> org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
> at org.jboss.netty.channel.Channels$4.run(Channels.java:389)
> at
> org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:41)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processEventQueue(AbstractNioWorker.java:352)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:236)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
> at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> Caused by: com.sleepycat.je.ThreadInterruptedException: (JE 5.0.73)
> Environment must be closed, caused by:
> com.sleepycat.je.ThreadInterruptedException: Environment invalid because of
> previous exception: (JE 5.0.73) /var/local/flume/castellan-reader-berkeley-db
> Channel closed, may be due to thread interrupt THREAD_INTERRUPTED:
> InterruptedException may cause incorrect internal state, unable to continue.
> Environment is invalid and must be closed.
> at
> com.sleepycat.je.ThreadInterruptedException.wrapSelf(ThreadInterruptedException.java:99)
> at
> com.sleepycat.je.dbi.EnvironmentImpl.checkIfInvalid(EnvironmentImpl.java:1512)
> at com.sleepycat.je.Transaction.checkEnv(Transaction.java:850)
> at com.sleepycat.je.Transaction.abort(Transaction.java:204)
> at
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:171)
> ... 26 more
> Caused by: com.sleepycat.je.ThreadInterruptedException: Environment invalid
> because of previous exception: (JE 5.0.73)
> /var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to
> thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect
> internal state, unable to continue. Environment is invalid and must be closed.
> at
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3054)
> at
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.access$500(FileManager.java:2710)
> at com.sleepycat.je.log.FileManager.syncLogEnd(FileManager.java:2022)
> at com.sleepycat.je.log.FSyncManager.executeFSync(FSyncManager.java:282)
> at com.sleepycat.je.log.FSyncManager.fsync(FSyncManager.java:233)
> at com.sleepycat.je.log.FileManager.groupSync(FileManager.java:2070)
> at com.sleepycat.je.log.LogManager.multiLog(LogManager.java:403)
> at com.sleepycat.je.log.LogManager.log(LogManager.java:335)
> at com.sleepycat.je.txn.Txn.logCommitEntry(Txn.java:957)
> at com.sleepycat.je.txn.Txn.commit(Txn.java:719)
> at com.sleepycat.je.txn.Txn.commit(Txn.java:584)
> at com.sleepycat.je.Transaction.commit(Transaction.java:317)
> at
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:167)
> ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
> at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:367)
> at
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3043)
> ... 38 more
> When using log4j2 embedded agent (which is a Flume node running inside the
> app):
> 2013-06-03 14:14:28,207 DEBUG Calling createLoggers on class
> org.apache.logging.log4j.core.config.plugins.LoggersPlugin for element
> loggers with params(loggers={org.apache.log4j.xml, org.springframework,
> org.eclipse.jetty, org.elasticsearch, human.com.globalrelay,
> machine.com.globalrelay, root})
> 2013-06-03 14:14:28,212 DEBUG Reconfiguration completed
> 2013-06-03 14:14:28,244 ERROR An exception occurred processing Appender
> FlumeAppender java.lang.IllegalStateException: Channel closed
> [channel=primary]
> at
> org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:355)
> at
> org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
> at
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:260)
> at
> org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59)
> at
> org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123)
> at
> org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
> at
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
> at
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
> at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
> at
> org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
> at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
> at
> org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:195)
> at
> org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
> at
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106)
> at
> org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
> at
> org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
> at
> org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
> at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
> at org.apache.flume.sink.AvroSink.start(AvroSink.java:242)
> at
> org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
> at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
> at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:236)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> Suppositions:
> I believe this is an issue with the handling of a failed put or take. The
> failure path interacts with rollback or the Avro client in bad ways.
> For example, for the Berkeley DB case, the Avro client uses a
> SynchronousQueue to do rendezvous. That queue uses InterruptedException
> internally. However, the FileChannel uses the NIO FileChannel (and
> AbstractInterruptibleChannel) which fails if its thread gets interrupted.
> Some docs:
> http://jira.codehaus.org/browse/JETTY-80
> http://www.oracle.com/technetwork/products/berkeleydb/if-097768.html
> For the embedded Flume case I haven't been able to establish the cause. I
> think there's a double rollback going on - or possibly the reading thread and
> the writing thread are being interfered with by other threads. (My initial
> theory was that Flume logging itself got tied up with singleton type issues
> but the Jetty repro I just found challenges that).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira