[ 
https://issues.apache.org/jira/browse/FLUME-2067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Sargisson updated FLUME-2067:
------------------------------------

    Description: 
Attempting to embed a Flume agent in another app does not work very well. I 
have found a repro of a very simple Jetty app using the log4j2 FlumeAppender to 
connect to a subsequent Flume agent with a full channel.

The impact is that I don't believe the BerkeleyDB agent can be safely used.

Steps:
1. Setup an additional Flume server (the subsequent server) with an avro source 
and make the channel fill up (in my environment the subsequent server gets an 
OutOfMemoryError and then starts queueing events.)
2. Extract the enclosed project. Edit the 
flume-embedded-hot-deploy/src/main/resource/log4j2.xml and configure the Agent 
for the FlumeAppender with the details of the subsequent server.
3. mvn clean install
4. Change to flume-embedded-hot-deploy
5. mvn clean package -P debug  (note that you can set it to suspend until a 
debugger is attached with mvn clean package -P debug,suspend)
6. Wait for Jetty to startup - and then for a few seconds.

Expected results:
Some complaints about the subsequent server being full but an otherwise happy 
server.

Actual results:
When using the log4j2 Persistent agent (which uses Berkeley DB as a store):
2013-06-03 14:01:14,804 INFO  [main] server.AbstractConnector 
(AbstractConnector.java:265) - Started 
ServerConnector@75a213c0{HTTP/1.1}{0.0.0.0:8080}
2013-06-03 14:01:22,779 DEBUG [Thread-3] ipc.NettyTransceiver 
(NettyTransceiver.java:314) - Disconnecting from 
collector1-sal-flex-van.dev-globalrelay.net/10.21.30.20:36892
2013-06-03 14:01:22,789 ERROR An exception occurred processing Appender 
FlumeAppender org.apache.logging.log4j.LoggingException: Exception occurred 
writing log event
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:176)
        at 
org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
        at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
        at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
        at 
org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
        at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
        at 
org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:491)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:230)
        at 
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:107)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at 
org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
        at org.jboss.netty.channel.Channels$4.run(Channels.java:389)
        at 
org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:41)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processEventQueue(AbstractNioWorker.java:352)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:236)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
Caused by: com.sleepycat.je.ThreadInterruptedException: (JE 5.0.73) Environment 
must be closed, caused by: com.sleepycat.je.ThreadInterruptedException: 
Environment invalid because of previous exception: (JE 5.0.73) 
/var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
internal state, unable to continue. Environment is invalid and must be closed.
        at 
com.sleepycat.je.ThreadInterruptedException.wrapSelf(ThreadInterruptedException.java:99)
        at 
com.sleepycat.je.dbi.EnvironmentImpl.checkIfInvalid(EnvironmentImpl.java:1512)
        at com.sleepycat.je.Transaction.checkEnv(Transaction.java:850)
        at com.sleepycat.je.Transaction.abort(Transaction.java:204)
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:171)
        ... 26 more
Caused by: com.sleepycat.je.ThreadInterruptedException: Environment invalid 
because of previous exception: (JE 5.0.73) 
/var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
internal state, unable to continue. Environment is invalid and must be closed.
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3054)
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.access$500(FileManager.java:2710)
        at com.sleepycat.je.log.FileManager.syncLogEnd(FileManager.java:2022)
        at com.sleepycat.je.log.FSyncManager.executeFSync(FSyncManager.java:282)
        at com.sleepycat.je.log.FSyncManager.fsync(FSyncManager.java:233)
        at com.sleepycat.je.log.FileManager.groupSync(FileManager.java:2070)
        at com.sleepycat.je.log.LogManager.multiLog(LogManager.java:403)
        at com.sleepycat.je.log.LogManager.log(LogManager.java:335)
        at com.sleepycat.je.txn.Txn.logCommitEntry(Txn.java:957)
        at com.sleepycat.je.txn.Txn.commit(Txn.java:719)
        at com.sleepycat.je.txn.Txn.commit(Txn.java:584)
        at com.sleepycat.je.Transaction.commit(Transaction.java:317)
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:167)
        ... 26 more
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:367)
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3043)
        ... 38 more

Suppositions:
I believe this is an issue with the handling of a failed put or take. The 
failure path interacts with rollback or the Avro client in bad ways.
For example, the Avro client uses a SynchronousQueue to do rendezvous. That 
queue uses InterruptedException internally. However, the FileChannel uses the 
NIO FileChannel (and AbstractInterruptibleChannel) which fails if its thread 
gets interrupted. My debugging sessions show that 
java.util.concurrent.CountDownLatch is a common source of 
InterruptedExceptions. CountDownLatch is also used by the Avro client.

Some docs:
http://jira.codehaus.org/browse/JETTY-80
http://www.oracle.com/technetwork/products/berkeleydb/if-097768.html

  was:
Attempting to embed a Flume agent in another app does not work very well. I 
have found a repro of a very simple Jetty app using the log4j2 FlumeAppender to 
connect to a subsequent Flume agent with a full channel.

The signature of this problem also occurs when I have a custom version of Flume 
with log4j2 trying to log its own logs via Flume. In that case, if I have two 
servers configured to log to each other then neither will start correctly 
because of issues pushing events to the subsequent agent.

Note that this is documented against Flume 1.3.1 because that's what the log4j2 
Flume Appender uses. Flume 1.4.0 changes the classes sufficiently that it is 
non-trivial to attempt a repro.

Note that I include some suppositions at the end of this work item. I haven't 
been able to properly determine what is causing the failure so you may not want 
to read them until you've done your own research.

Steps:
1. Setup an additional Flume server (the subsequent server) with an avro source 
and make the channel fill up.
2. Extract the enclosed project. Edit the 
flume-embedded-hot-deploy/src/main/resource/log4j2.xml and configure the Agent 
for the FlumeAppender with the details of the subsequent server.
3. mvn clean install
4. Change to flume-embedded-hot-deploy
5. mvn clean package -P debug  (note that you can set it to suspend until a 
debugger is attached with mvn clean package -P debug,suspend)
6. Wait for Jetty to startup - and then for a few seconds.

Expected results:
Some complaints about the subsequent server being full but an otherwise happy 
server.

Actual results:
When using the log4j2 Persistent agent (which uses Berkeley DB as a store):
2013-06-03 14:01:14,804 INFO  [main] server.AbstractConnector 
(AbstractConnector.java:265) - Started 
ServerConnector@75a213c0{HTTP/1.1}{0.0.0.0:8080}
2013-06-03 14:01:22,779 DEBUG [Thread-3] ipc.NettyTransceiver 
(NettyTransceiver.java:314) - Disconnecting from 
collector1-sal-flex-van.dev-globalrelay.net/10.21.30.20:36892
2013-06-03 14:01:22,789 ERROR An exception occurred processing Appender 
FlumeAppender org.apache.logging.log4j.LoggingException: Exception occurred 
writing log event
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:176)
        at 
org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
        at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
        at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
        at 
org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
        at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
        at 
org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:491)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:230)
        at 
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:107)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at 
org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
        at org.jboss.netty.channel.Channels$4.run(Channels.java:389)
        at 
org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:41)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processEventQueue(AbstractNioWorker.java:352)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:236)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
Caused by: com.sleepycat.je.ThreadInterruptedException: (JE 5.0.73) Environment 
must be closed, caused by: com.sleepycat.je.ThreadInterruptedException: 
Environment invalid because of previous exception: (JE 5.0.73) 
/var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
internal state, unable to continue. Environment is invalid and must be closed.
        at 
com.sleepycat.je.ThreadInterruptedException.wrapSelf(ThreadInterruptedException.java:99)
        at 
com.sleepycat.je.dbi.EnvironmentImpl.checkIfInvalid(EnvironmentImpl.java:1512)
        at com.sleepycat.je.Transaction.checkEnv(Transaction.java:850)
        at com.sleepycat.je.Transaction.abort(Transaction.java:204)
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:171)
        ... 26 more
Caused by: com.sleepycat.je.ThreadInterruptedException: Environment invalid 
because of previous exception: (JE 5.0.73) 
/var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
internal state, unable to continue. Environment is invalid and must be closed.
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3054)
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.access$500(FileManager.java:2710)
        at com.sleepycat.je.log.FileManager.syncLogEnd(FileManager.java:2022)
        at com.sleepycat.je.log.FSyncManager.executeFSync(FSyncManager.java:282)
        at com.sleepycat.je.log.FSyncManager.fsync(FSyncManager.java:233)
        at com.sleepycat.je.log.FileManager.groupSync(FileManager.java:2070)
        at com.sleepycat.je.log.LogManager.multiLog(LogManager.java:403)
        at com.sleepycat.je.log.LogManager.log(LogManager.java:335)
        at com.sleepycat.je.txn.Txn.logCommitEntry(Txn.java:957)
        at com.sleepycat.je.txn.Txn.commit(Txn.java:719)
        at com.sleepycat.je.txn.Txn.commit(Txn.java:584)
        at com.sleepycat.je.Transaction.commit(Transaction.java:317)
        at 
org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:167)
        ... 26 more
Caused by: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:367)
        at 
com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3043)
        ... 38 more

When using log4j2 embedded agent (which is a Flume node running inside the 
app): 
2013-06-03 14:14:28,207 DEBUG Calling createLoggers on class 
org.apache.logging.log4j.core.config.plugins.LoggersPlugin for element loggers 
with params(loggers={org.apache.log4j.xml, org.springframework, 
org.eclipse.jetty, org.elasticsearch, human.com.globalrelay, 
machine.com.globalrelay, root})
2013-06-03 14:14:28,212 DEBUG Reconfiguration completed
2013-06-03 14:14:28,244 ERROR An exception occurred processing Appender 
FlumeAppender java.lang.IllegalStateException: Channel closed [channel=primary]
        at 
org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:355)
        at 
org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
        at 
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:260)
        at 
org.apache.logging.log4j.flume.appender.Log4jEventSource.send(Log4jEventSource.java:59)
        at 
org.apache.logging.log4j.flume.appender.FlumeEmbeddedManager.send(FlumeEmbeddedManager.java:123)
        at 
org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
        at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
        at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
        at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
        at 
org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
        at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
        at 
org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:195)
        at 
org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
        at 
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106)
        at 
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
        at 
org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
        at 
org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
        at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
        at org.apache.flume.sink.AvroSink.start(AvroSink.java:242)
        at 
org.apache.flume.sink.AbstractSinkProcessor.start(AbstractSinkProcessor.java:41)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at 
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:236)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Suppositions:
I believe this is an issue with the handling of a failed put or take. The 
failure path interacts with rollback or the Avro client in bad ways.
For example, for the Berkeley DB case, the Avro client uses a SynchronousQueue 
to do rendezvous. That queue uses InterruptedException internally. However, the 
FileChannel uses the NIO FileChannel (and AbstractInterruptibleChannel) which 
fails if its thread gets interrupted. 

Some docs:
http://jira.codehaus.org/browse/JETTY-80
http://www.oracle.com/technetwork/products/berkeleydb/if-097768.html

For the embedded Flume case I haven't been able to establish the cause. I think 
there's a double rollback going on - or possibly the reading thread and the 
writing thread are being interfered with by other threads. (My initial theory 
was that Flume logging itself got tied up with singleton type issues but the 
Jetty repro I just found challenges that).

    
> Logging from log4j2 FlumeAppender with BerkeleyDB agent from Jetty webapp to 
> Avro source with full queue raises ClosedByInterruptException
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2067
>                 URL: https://issues.apache.org/jira/browse/FLUME-2067
>             Project: Flume
>          Issue Type: Bug
>          Components: File Channel, Sinks+Sources
>    Affects Versions: v1.3.1
>            Reporter: Edward Sargisson
>         Attachments: flume-embedded-web-flume-2067.tar.gz
>
>
> Attempting to embed a Flume agent in another app does not work very well. I 
> have found a repro of a very simple Jetty app using the log4j2 FlumeAppender 
> to connect to a subsequent Flume agent with a full channel.
> The impact is that I don't believe the BerkeleyDB agent can be safely used.
> Steps:
> 1. Setup an additional Flume server (the subsequent server) with an avro 
> source and make the channel fill up (in my environment the subsequent server 
> gets an OutOfMemoryError and then starts queueing events.)
> 2. Extract the enclosed project. Edit the 
> flume-embedded-hot-deploy/src/main/resource/log4j2.xml and configure the 
> Agent for the FlumeAppender with the details of the subsequent server.
> 3. mvn clean install
> 4. Change to flume-embedded-hot-deploy
> 5. mvn clean package -P debug  (note that you can set it to suspend until a 
> debugger is attached with mvn clean package -P debug,suspend)
> 6. Wait for Jetty to startup - and then for a few seconds.
> Expected results:
> Some complaints about the subsequent server being full but an otherwise happy 
> server.
> Actual results:
> When using the log4j2 Persistent agent (which uses Berkeley DB as a store):
> 2013-06-03 14:01:14,804 INFO  [main] server.AbstractConnector 
> (AbstractConnector.java:265) - Started 
> ServerConnector@75a213c0{HTTP/1.1}{0.0.0.0:8080}
> 2013-06-03 14:01:22,779 DEBUG [Thread-3] ipc.NettyTransceiver 
> (NettyTransceiver.java:314) - Disconnecting from 
> collector1-sal-flex-van.dev-globalrelay.net/10.21.30.20:36892
> 2013-06-03 14:01:22,789 ERROR An exception occurred processing Appender 
> FlumeAppender org.apache.logging.log4j.LoggingException: Exception occurred 
> writing log event
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:176)
>       at 
> org.apache.logging.log4j.flume.appender.FlumeAppender.append(FlumeAppender.java:86)
>       at 
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:102)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:424)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:405)
>       at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:366)
>       at org.apache.logging.log4j.core.Logger.log(Logger.java:110)
>       at 
> org.apache.logging.log4j.spi.AbstractLoggerWrapper.log(AbstractLoggerWrapper.java:55)
>       at org.slf4j.impl.SLF4JLogger.debug(SLF4JLogger.java:139)
>       at 
> org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:491)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
>       at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:230)
>       at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:107)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>       at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>       at 
> org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
>       at org.jboss.netty.channel.Channels$4.run(Channels.java:389)
>       at 
> org.jboss.netty.channel.socket.ChannelRunnableWrapper.run(ChannelRunnableWrapper.java:41)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processEventQueue(AbstractNioWorker.java:352)
>       at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:236)
>       at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>       at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:722)
> Caused by: com.sleepycat.je.ThreadInterruptedException: (JE 5.0.73) 
> Environment must be closed, caused by: 
> com.sleepycat.je.ThreadInterruptedException: Environment invalid because of 
> previous exception: (JE 5.0.73) /var/local/flume/castellan-reader-berkeley-db 
> Channel closed, may be due to thread interrupt THREAD_INTERRUPTED: 
> InterruptedException may cause incorrect internal state, unable to continue. 
> Environment is invalid and must be closed.
>       at 
> com.sleepycat.je.ThreadInterruptedException.wrapSelf(ThreadInterruptedException.java:99)
>       at 
> com.sleepycat.je.dbi.EnvironmentImpl.checkIfInvalid(EnvironmentImpl.java:1512)
>       at com.sleepycat.je.Transaction.checkEnv(Transaction.java:850)
>       at com.sleepycat.je.Transaction.abort(Transaction.java:204)
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:171)
>       ... 26 more
> Caused by: com.sleepycat.je.ThreadInterruptedException: Environment invalid 
> because of previous exception: (JE 5.0.73) 
> /var/local/flume/castellan-reader-berkeley-db Channel closed, may be due to 
> thread interrupt THREAD_INTERRUPTED: InterruptedException may cause incorrect 
> internal state, unable to continue. Environment is invalid and must be closed.
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3054)
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.access$500(FileManager.java:2710)
>       at com.sleepycat.je.log.FileManager.syncLogEnd(FileManager.java:2022)
>       at com.sleepycat.je.log.FSyncManager.executeFSync(FSyncManager.java:282)
>       at com.sleepycat.je.log.FSyncManager.fsync(FSyncManager.java:233)
>       at com.sleepycat.je.log.FileManager.groupSync(FileManager.java:2070)
>       at com.sleepycat.je.log.LogManager.multiLog(LogManager.java:403)
>       at com.sleepycat.je.log.LogManager.log(LogManager.java:335)
>       at com.sleepycat.je.txn.Txn.logCommitEntry(Txn.java:957)
>       at com.sleepycat.je.txn.Txn.commit(Txn.java:719)
>       at com.sleepycat.je.txn.Txn.commit(Txn.java:584)
>       at com.sleepycat.je.Transaction.commit(Transaction.java:317)
>       at 
> org.apache.logging.log4j.flume.appender.FlumePersistentManager.send(FlumePersistentManager.java:167)
>       ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
>       at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>       at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:367)
>       at 
> com.sleepycat.je.log.FileManager$LogEndFileDescriptor.force(FileManager.java:3043)
>       ... 38 more
> Suppositions:
> I believe this is an issue with the handling of a failed put or take. The 
> failure path interacts with rollback or the Avro client in bad ways.
> For example, the Avro client uses a SynchronousQueue to do rendezvous. That 
> queue uses InterruptedException internally. However, the FileChannel uses the 
> NIO FileChannel (and AbstractInterruptibleChannel) which fails if its thread 
> gets interrupted. My debugging sessions show that 
> java.util.concurrent.CountDownLatch is a common source of 
> InterruptedExceptions. CountDownLatch is also used by the Avro client.
> Some docs:
> http://jira.codehaus.org/browse/JETTY-80
> http://www.oracle.com/technetwork/products/berkeleydb/if-097768.html

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to