DuLe created FLUME-3077:
---------------------------

             Summary: Unexpected exception from downstream with Avro 
sink+source compression (deflate)
                 Key: FLUME-3077
                 URL: https://issues.apache.org/jira/browse/FLUME-3077
             Project: Flume
          Issue Type: Question
          Components: Sinks+Sources
    Affects Versions: 1.7.0
            Reporter: DuLe


I'm trying to setup a basic 2-tier Flume using the Avro source/sink to 
communicate between tiers. I set the same compression-type on both side as the 
following config:

# on the sink side:
agent.sinks.k11.type = avro
agent.sinks.k11.channel = c1
agent.sinks.k11.batch-size = 100
agent.sinks.k11.connect-timeout = 60000
agent.sinks.k11.request-timeout = 60000
agent.sinks.k11.reset-connection-interval = 3
agent.sinks.k11.hostname = <ip_source_host>
agent.sinks.k11.port = 4145
agent.sinks.k11.compression-type=deflate
agent.sinks.k11.compression-level = 6
agent.sinks.k11.maxIoWorkers = 8

# And the same on the source side:
collector.sources=r1
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.channels=c1
collector.sources.r1.compression-level=6
collector.sources.r1.compression-type=deflate
collector.sources.r1.port=4145
collector.sources.r1.type=avro

# But I was getting this error:
id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: 
/10.199.6.232:4145
24 Mar 2017 10:39:46,891 INFO  [New I/O worker #6] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: 
/10.199.6.225:32780
24 Mar 2017 10:39:46,913 INFO  [New I/O worker #5] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED
24 Mar 2017 10:39:46,913 INFO  [New I/O worker #5] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND
24 Mar 2017 10:39:46,913 INFO  [New I/O worker #5] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - 
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED
24 Mar 2017 10:39:46,913 INFO  [New I/O worker #5] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209)  - 
Connection to /10.199.6.225:60953 disconnected.
24 Mar 2017 10:39:46,913 WARN  [New I/O worker #5] 
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201)  - 
Unexpected exception from downstream.
java.nio.channels.ClosedChannelException
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
        at 
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99)
        at 
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
        at org.jboss.netty.channel.Channels.write(Channels.java:725)
        at org.jboss.netty.channel.Channels.write(Channels.java:686)
        at 
org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380)
        at 
org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
        at org.jboss.netty.channel.Channels.close(Channels.java:812)
        at 
org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        at 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212)
        at 
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
        at 
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
        at 
org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
        at 
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at 
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


This warning appear after the channel closed the connection and occur 
continuously. It only appear when config compression with "deflate"
Despite there are many continuously warnings, the data can decompress ok. Does 
this warning effect to my flow ? I'm using Flume v1.7



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to