[ 
https://issues.apache.org/jira/browse/CAMEL-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew McMahon updated CAMEL-4556:
-----------------------------------

    Description: 
Using a NettyProducer without the disconnect=true configuration is causing the 
route to block after 10 messages on the to("netty://tcp....") call.

It appears that a new socket connection is created for every message, and then 
after 10 connections no new connection is allowed (must be a default thread 
pool limit?).

Using the disconnect=true option fixes the problem as a socket is connected, 
message sent, then disconnected. But this does not seem viable for 
implementations where that overhead is undesirable.

--

This is a small Unit Test that shows the problem. 



package netty; 

import java.util.Arrays; 
import java.util.Collection; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.concurrent.atomic.AtomicInteger; 

import junit.framework.TestCase; 

import org.apache.camel.CamelContext; 
import org.apache.camel.Exchange; 
import org.apache.camel.ExchangePattern; 
import org.apache.camel.Processor; 
import org.apache.camel.builder.RouteBuilder; 
import org.apache.camel.impl.DefaultCamelContext; 
import org.junit.Before; 
import org.junit.BeforeClass; 
import org.junit.Test; 
import org.junit.runner.RunWith; 
import org.junit.runners.Parameterized; 
import org.junit.runners.Parameterized.Parameters; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

@RunWith(Parameterized.class) 
public class NettyTest extends TestCase 
{ 
    private final static Logger logger = 
LoggerFactory.getLogger(NettyTest.class); 
    private final static CamelContext serverContext = new 
DefaultCamelContext(); 

    private final CamelContext clientContext = new DefaultCamelContext(); 
    private final AtomicInteger responseCounter = new AtomicInteger(0); 
    private final AtomicBoolean passedTen = new AtomicBoolean(false); 

    private Boolean disconnectClient; 

    public NettyTest(Boolean disconnectClient) 
    { 
        this.disconnectClient = disconnectClient; 
    } 

    @Parameters 
    public static Collection<Object[]> configs() 
    { 
        return Arrays.asList(new Object[][] { { true }, { false } }); 
    } 

    @BeforeClass 
    public static void createServer() throws Exception 
    { 
        serverContext.addRoutes(new RouteBuilder() 
        { 
            @Override 
            public void configure() throws Exception 
            { 
                
from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
 
                        .setExchangePattern(ExchangePattern.InOut) 
                        .process(new Processor() { 

                            @Override 
                            public void process(Exchange exchange) throws 
Exception 
                            { 
                                Object body = exchange.getIn().getBody(); 
                                logger.info("Request received : Value = {}", 
body); 
                            } 
                            
                        }) 
                        .transform(constant(3)).stop(); 
            } 
        }); 

        serverContext.start(); 
    } 

    @Before 
    public void createClient() throws Exception 
    { 
        clientContext.addRoutes(new RouteBuilder() 
        { 
            @Override 
            public void configure() throws Exception 
            { 
                // Generate an Echo message and ensure a Response is sent 
                from("timer://echoTimer?delay=1s&fixedRate=true&period=1s") 
                        .setExchangePattern(ExchangePattern.InOut) 
                        .transform() 
                        .constant(2) 
                        .to(ExchangePattern.InOut, 
"netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect="
 + disconnectClient.toString()) 
                        .process(new Processor() 
                        { 
                            @Override 
                            public void process(Exchange exchange) throws 
Exception 
                            { 
                                Object body = exchange.getIn().getBody(); 
                                logger.info("Response number {} : Value = {}", 
                                        responseCounter.incrementAndGet(), 
body); 

                                if (responseCounter.get() > 10) { 
                                    passedTen.set(true); 
                                } 
                            } 

                        }).stop(); 
            } 
        }); 
    } 

    @Test 
    public void test() throws Exception 
    { 
        clientContext.getShutdownStrategy().setTimeout(1); 

        clientContext.start(); 

        logger.info("Disconnect = {}", this.disconnectClient); 

        Thread.sleep(TimeUnit.SECONDS.toMillis(15)); 

        clientContext.stop(); 

        assertTrue("More than 10 responses have been received", 
passedTen.get()); 
    } 
} 

  was:
Using a NettyProducer without the disconnect=true configuration is causing the 
route to block after 10 messages on the to("netty://tcp....") call.

It appears that a new socket connection is created for every message, and then 
after 10 connections no new connection is allowed (must be a default thread 
pool limit?).

Using the disconnect=true option fixes the problem as a socket is connected, 
message sent, then disconnected. But this does not seem viable for 
implementations where that overhead is undesirable.

    
> NettyProducer creating new connection on every message
> ------------------------------------------------------
>
>                 Key: CAMEL-4556
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4556
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-netty
>    Affects Versions: 2.8.1
>            Reporter: Matthew McMahon
>            Priority: Minor
>
> Using a NettyProducer without the disconnect=true configuration is causing 
> the route to block after 10 messages on the to("netty://tcp....") call.
> It appears that a new socket connection is created for every message, and 
> then after 10 connections no new connection is allowed (must be a default 
> thread pool limit?).
> Using the disconnect=true option fixes the problem as a socket is connected, 
> message sent, then disconnected. But this does not seem viable for 
> implementations where that overhead is undesirable.
> --
> This is a small Unit Test that shows the problem. 
> package netty; 
> import java.util.Arrays; 
> import java.util.Collection; 
> import java.util.concurrent.TimeUnit; 
> import java.util.concurrent.atomic.AtomicBoolean; 
> import java.util.concurrent.atomic.AtomicInteger; 
> import junit.framework.TestCase; 
> import org.apache.camel.CamelContext; 
> import org.apache.camel.Exchange; 
> import org.apache.camel.ExchangePattern; 
> import org.apache.camel.Processor; 
> import org.apache.camel.builder.RouteBuilder; 
> import org.apache.camel.impl.DefaultCamelContext; 
> import org.junit.Before; 
> import org.junit.BeforeClass; 
> import org.junit.Test; 
> import org.junit.runner.RunWith; 
> import org.junit.runners.Parameterized; 
> import org.junit.runners.Parameterized.Parameters; 
> import org.slf4j.Logger; 
> import org.slf4j.LoggerFactory; 
> @RunWith(Parameterized.class) 
> public class NettyTest extends TestCase 
> { 
>     private final static Logger logger = 
> LoggerFactory.getLogger(NettyTest.class); 
>     private final static CamelContext serverContext = new 
> DefaultCamelContext(); 
>     private final CamelContext clientContext = new DefaultCamelContext(); 
>     private final AtomicInteger responseCounter = new AtomicInteger(0); 
>     private final AtomicBoolean passedTen = new AtomicBoolean(false); 
>     private Boolean disconnectClient; 
>     public NettyTest(Boolean disconnectClient) 
>     { 
>         this.disconnectClient = disconnectClient; 
>     } 
>     @Parameters 
>     public static Collection<Object[]> configs() 
>     { 
>         return Arrays.asList(new Object[][] { { true }, { false } }); 
>     } 
>     @BeforeClass 
>     public static void createServer() throws Exception 
>     { 
>         serverContext.addRoutes(new RouteBuilder() 
>         { 
>             @Override 
>             public void configure() throws Exception 
>             { 
>                 
> from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
>  
>                         .setExchangePattern(ExchangePattern.InOut) 
>                         .process(new Processor() { 
>                             @Override 
>                             public void process(Exchange exchange) throws 
> Exception 
>                             { 
>                                 Object body = exchange.getIn().getBody(); 
>                                 logger.info("Request received : Value = {}", 
> body); 
>                             } 
>                             
>                         }) 
>                         .transform(constant(3)).stop(); 
>             } 
>         }); 
>         serverContext.start(); 
>     } 
>     @Before 
>     public void createClient() throws Exception 
>     { 
>         clientContext.addRoutes(new RouteBuilder() 
>         { 
>             @Override 
>             public void configure() throws Exception 
>             { 
>                 // Generate an Echo message and ensure a Response is sent 
>                 from("timer://echoTimer?delay=1s&fixedRate=true&period=1s") 
>                         .setExchangePattern(ExchangePattern.InOut) 
>                         .transform() 
>                         .constant(2) 
>                         .to(ExchangePattern.InOut, 
> "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect="
>  + disconnectClient.toString()) 
>                         .process(new Processor() 
>                         { 
>                             @Override 
>                             public void process(Exchange exchange) throws 
> Exception 
>                             { 
>                                 Object body = exchange.getIn().getBody(); 
>                                 logger.info("Response number {} : Value = 
> {}", 
>                                         responseCounter.incrementAndGet(), 
> body); 
>                                 if (responseCounter.get() > 10) { 
>                                     passedTen.set(true); 
>                                 } 
>                             } 
>                         }).stop(); 
>             } 
>         }); 
>     } 
>     @Test 
>     public void test() throws Exception 
>     { 
>         clientContext.getShutdownStrategy().setTimeout(1); 
>         clientContext.start(); 
>         logger.info("Disconnect = {}", this.disconnectClient); 
>         Thread.sleep(TimeUnit.SECONDS.toMillis(15)); 
>         clientContext.stop(); 
>         assertTrue("More than 10 responses have been received", 
> passedTen.get()); 
>     } 
> } 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to