[
https://issues.apache.org/jira/browse/FLUME-2752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14647088#comment-14647088
]
yinghua_zh commented on FLUME-2752:
-----------------------------------
@Override
public void start() {
logger.info("Starting {}...", this);
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
NioServerSocketChannelFactory socketChannelFactory =
initSocketChannelFactory();
ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
sourceCounter.start();
super.start();
final NettyServer srv = (NettyServer)server;
connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
@Override
public void run() {
sourceCounter.setOpenConnectionCount(
Long.valueOf(srv.getNumActiveConnections()));
}
}, 0, 60, TimeUnit.SECONDS);
logger.info("Avro source {} started.", getName());
}
I have modfye the code ,can you accept it?
@Override
public void start() {
logger.info("Starting {}...", this);
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
NioServerSocketChannelFactory socketChannelFactory =
initSocketChannelFactory();
ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
try
{
server = new NettyServer(responder, new InetSocketAddress(bindAddress,
port),
socketChannelFactory, pipelineFactory, null);
}catch (Exception ex){
try{
socketChannelFactory.releaseExternalResources();
}
catch (Exception e1){
logger.error("realese ectrrnla resource occur exception:", e1);
}
try{
socketChannelFactory.releaseExternalResources();
} catch (Exception e2){
logger.error("realese ectrrnla resource occur exception:", e2);
}
throw new RuntimeException(ex);
}
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
sourceCounter.start();
super.start();
final NettyServer srv = (NettyServer)server;
connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
@Override
public void run() {
sourceCounter.setOpenConnectionCount(
Long.valueOf(srv.getNumActiveConnections()));
}
}, 0, 60, TimeUnit.SECONDS);
logger.info("Avro source {} started.", getName());
}
> Flume AvroSoucr will leak the memory and the OOM will be happened.
> ------------------------------------------------------------------
>
> Key: FLUME-2752
> URL: https://issues.apache.org/jira/browse/FLUME-2752
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v1.6.0
> Reporter: yinghua_zh
>
> If the flume agent config the nonexist IP for the avro source,the exception
> will be happened as follow:
> 2015-07-21 19:57:47,054 | ERROR | [lifecycleSupervisor-1-2] | Unable to
> start EventDrivenSourceRunner: { source:Avro source avro_source_21155: {
> bindAddress: 51.196.27.32, port: 21155 } } - Exception follows. |
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)
> org.jboss.netty.channel.ChannelException: Failed to bind to:
> /51.196.27.32:21155
> at
> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:297)
> at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
> at org.apache.flume.source.AvroSource.start(AvroSource.java:294)
> at
> org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
> at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.BindException: Cannot assign requested address
> at sun.nio.ch.Net.bind0(Native Method)
> at sun.nio.ch.Net.bind(Net.java:437)
> at sun.nio.ch.Net.bind(Net.java:429)
> at
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:140)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)
> at org.jboss.netty.channel.Channels.bind(Channels.java:569)
> at
> org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:189)
> at
> org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:342)
> at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)
> at
> org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)
> at
> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:276)
> if the above exception happened for 2 hours,and the agent JVM -Xxx is 4G,the
> OutOfMemory will be happened.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)