[ 
https://issues.apache.org/jira/browse/STORM-556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214308#comment-14214308
 ] 

Xavier Fu edited comment on STORM-556 at 11/17/14 6:02 AM:
-----------------------------------------------------------

the channelRef  will not be null.
because the also the netty event loop close the socket,  but th Client don't 
register any callback to monitor the socket close event,  so the channelRef  is 
pointing to a closed channel,  and the send() fuction pass to add message to 
buffer。

I have writed code to test Netty isWritable().
After  connected the server, test program sleep then I shutdown the server. 
when the test program wake up,  the result is:

connect OK
nowritable
noconnected



@Test
    public void testNetty(){
                        ThreadFactory bossFactory = new 
NettyRenameThreadFactoryX("client" + "-boss");
                        ThreadFactory workerFactory = new 
NettyRenameThreadFactoryX("client" + "-worker");
                        NioClientSocketChannelFactory factory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                    Executors.newCachedThreadPool(workerFactory), 2);
                        ClientBootstrap bootstrap = new 
ClientBootstrap(factory);
                bootstrap.setOption("tcpNoDelay", true);
                bootstrap.setOption("sendBufferSize", 10000);
                bootstrap.setOption("keepAlive", true);

                // Set up the pipeline factory.
                bootstrap.setPipelineFactory(new StormClientPipelineFactoryX());

                // Start the connection attempt.
                InetSocketAddress remote_addr = new 
InetSocketAddress("xxx.xxx.xxx.xxx", 1234);
                ChannelFuture future = bootstrap.connect(remote_addr);
            future.awaitUninterruptibly();
            Channel current = future.getChannel();
            if (!future.isSuccess()) {
                if (null != current) {
                    current.close();
                }
            } else {
                
                System.out.println("connect OK");
            }
            try {
                                Thread.sleep(1000 * 10);
                        } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            
            
            if(current.isWritable()){
                System.out.println("writable");
            }else{
                System.out.println("nowritable");
            }
            
            if(current.isConnected()){
                System.out.println("connected");
            }else{
                System.out.println("noconnected");
            }
        }


was (Author: fxw):
the channelRef  will not be null.
because the also the netty event loop close the socket,  but th Client don't 
register any callback to monitor the socket close event,  so the channelRef  is 
pointing to a closed channel,  and the send() fuction pass to add message to 
buffer。

I have writed code to test Netty isWritable().
After  connected the server, test program sleep then I shutdown the server. 
when the test program wake up,  the result is:

connect OK
nowritable
noconnected

so 


@Test
    public void testNetty(){
                        ThreadFactory bossFactory = new 
NettyRenameThreadFactoryX("client" + "-boss");
                        ThreadFactory workerFactory = new 
NettyRenameThreadFactoryX("client" + "-worker");
                        NioClientSocketChannelFactory factory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                    Executors.newCachedThreadPool(workerFactory), 2);
                        ClientBootstrap bootstrap = new 
ClientBootstrap(factory);
                bootstrap.setOption("tcpNoDelay", true);
                bootstrap.setOption("sendBufferSize", 10000);
                bootstrap.setOption("keepAlive", true);

                // Set up the pipeline factory.
                bootstrap.setPipelineFactory(new StormClientPipelineFactoryX());

                // Start the connection attempt.
                InetSocketAddress remote_addr = new 
InetSocketAddress("xxx.xxx.xxx.xxx", 1234);
                ChannelFuture future = bootstrap.connect(remote_addr);
            future.awaitUninterruptibly();
            Channel current = future.getChannel();
            if (!future.isSuccess()) {
                if (null != current) {
                    current.close();
                }
            } else {
                
                System.out.println("connect OK");
            }
            try {
                                Thread.sleep(1000 * 10);
                        } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
            
            
            if(current.isWritable()){
                System.out.println("writable");
            }else{
                System.out.println("nowritable");
            }
            
            if(current.isConnected()){
                System.out.println("connected");
            }else{
                System.out.println("noconnected");
            }
        }

> netty Client  reconnect bug
> ---------------------------
>
>                 Key: STORM-556
>                 URL: https://issues.apache.org/jira/browse/STORM-556
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.2-incubating
>            Reporter: Xavier Fu
>            Assignee: Xavier Fu
>              Labels: ClientID, hangup, netty
>
> if a storm worker die then restarted and the nimbus  reassign the task on the 
> restarted worker with same host:port, the upstream task will not reconnect 
> the host:port and the topology will hang up。
> because the connect has broken and netty Client flush timer 
> if (null != channel && channel.isWritable()) {
>                             flush(channel);
> }
> always false.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to