[ https://issues.apache.org/jira/browse/FLUME-3077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kai updated FLUME-3077: ----------------------- Description: I'm trying to setup a basic 2-tier Flume using the Avro source/sink to communicate between tiers. I set the same compression-type on both side as the following config: # on the sink side: agent.sinks.k1.type = avro agent.sinks.k1.channel = c1 agent.sinks.k1.batch-size = 100 agent.sinks.k1.connect-timeout = 60000 agent.sinks.k1.request-timeout = 60000 agent.sinks.k1.reset-connection-interval = 3 agent.sinks.k1.hostname = <ip_source_host> agent.sinks.k1.port = 4145 agent.sinks.k1.compression-type=deflate agent.sinks.k1.compression-level = 6 agent.sinks.k1.maxIoWorkers = 8 # And the same on the source side: collector.sources=r1 collector.sources.r1.bind=0.0.0.0 collector.sources.r1.channels=c1 collector.sources.r1.compression-level=6 collector.sources.r1.compression-type=deflate collector.sources.r1.port=4145 collector.sources.r1.type=avro # But I was getting this error: id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: /10.199.6.232:4145 24 Mar 2017 10:39:46,891 INFO [New I/O worker #6] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: /10.199.6.225:32780 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) - Connection to /10.199.6.225:60953 disconnected. 24 Mar 2017 10:39:46,913 WARN [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) - Unexpected exception from downstream. java.nio.channels.ClosedChannelException at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) at org.jboss.netty.channel.Channels.write(Channels.java:725) at org.jboss.netty.channel.Channels.write(Channels.java:686) at org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380) at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) at org.jboss.netty.channel.Channels.close(Channels.java:812) at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493) at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This warning appear after the channel closed the connection and occur continuously. It only appear when config compression with "deflate" Despite there are many continuously warnings, the data can decompress ok. Does this warning effect to my flow ? I'm using Flume v1.7 was: I'm trying to setup a basic 2-tier Flume using the Avro source/sink to communicate between tiers. I set the same compression-type on both side as the following config: # on the sink side: agent.sinks.k11.type = avro agent.sinks.k11.channel = c1 agent.sinks.k11.batch-size = 100 agent.sinks.k11.connect-timeout = 60000 agent.sinks.k11.request-timeout = 60000 agent.sinks.k11.reset-connection-interval = 3 agent.sinks.k11.hostname = <ip_source_host> agent.sinks.k11.port = 4145 agent.sinks.k11.compression-type=deflate agent.sinks.k11.compression-level = 6 agent.sinks.k11.maxIoWorkers = 8 # And the same on the source side: collector.sources=r1 collector.sources.r1.bind=0.0.0.0 collector.sources.r1.channels=c1 collector.sources.r1.compression-level=6 collector.sources.r1.compression-type=deflate collector.sources.r1.port=4145 collector.sources.r1.type=avro # But I was getting this error: id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: /10.199.6.232:4145 24 Mar 2017 10:39:46,891 INFO [New I/O worker #6] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: /10.199.6.225:32780 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) - Connection to /10.199.6.225:60953 disconnected. 24 Mar 2017 10:39:46,913 WARN [New I/O worker #5] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) - Unexpected exception from downstream. java.nio.channels.ClosedChannelException at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) at org.jboss.netty.channel.Channels.write(Channels.java:725) at org.jboss.netty.channel.Channels.write(Channels.java:686) at org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380) at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) at org.jboss.netty.channel.Channels.close(Channels.java:812) at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493) at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This warning appear after the channel closed the connection and occur continuously. It only appear when config compression with "deflate" Despite there are many continuously warnings, the data can decompress ok. Does this warning effect to my flow ? I'm using Flume v1.7 > Unexpected exception from downstream with Avro sink+source compression > (deflate) > -------------------------------------------------------------------------------- > > Key: FLUME-3077 > URL: https://issues.apache.org/jira/browse/FLUME-3077 > Project: Flume > Issue Type: Question > Components: Sinks+Sources > Affects Versions: 1.7.0 > Reporter: Kai > > I'm trying to setup a basic 2-tier Flume using the Avro source/sink to > communicate between tiers. I set the same compression-type on both side as > the following config: > # on the sink side: > agent.sinks.k1.type = avro > agent.sinks.k1.channel = c1 > agent.sinks.k1.batch-size = 100 > agent.sinks.k1.connect-timeout = 60000 > agent.sinks.k1.request-timeout = 60000 > agent.sinks.k1.reset-connection-interval = 3 > agent.sinks.k1.hostname = <ip_source_host> > agent.sinks.k1.port = 4145 > agent.sinks.k1.compression-type=deflate > agent.sinks.k1.compression-level = 6 > agent.sinks.k1.maxIoWorkers = 8 > # And the same on the source side: > collector.sources=r1 > collector.sources.r1.bind=0.0.0.0 > collector.sources.r1.channels=c1 > collector.sources.r1.compression-level=6 > collector.sources.r1.compression-type=deflate > collector.sources.r1.port=4145 > collector.sources.r1.type=avro > # But I was getting this error: > id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: > /10.199.6.232:4145 > 24 Mar 2017 10:39:46,891 INFO [New I/O worker #6] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) > - [id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: > /10.199.6.225:32780 > 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) > - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED > 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) > - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND > 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) > - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED > 24 Mar 2017 10:39:46,913 INFO [New I/O worker #5] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) - > Connection to /10.199.6.225:60953 disconnected. > 24 Mar 2017 10:39:46,913 WARN [New I/O worker #5] > (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) > - Unexpected exception from downstream. > java.nio.channels.ClosedChannelException > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) > at > org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99) > at > org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) > at org.jboss.netty.channel.Channels.write(Channels.java:725) > at org.jboss.netty.channel.Channels.write(Channels.java:686) > at > org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380) > at > org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) > at org.jboss.netty.channel.Channels.close(Channels.java:812) > at > org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) > at > org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) > at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375) > at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) > at > org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > at > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > This warning appear after the channel closed the connection and occur > continuously. It only appear when config compression with "deflate" > Despite there are many continuously warnings, the data can decompress ok. > Does this warning effect to my flow ? I'm using Flume v1.7 -- This message was sent by Atlassian JIRA (v6.3.15#6346)