[ https://issues.apache.org/jira/browse/CAMEL-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthew McMahon updated CAMEL-4556: ----------------------------------- Description: Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call. It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?). Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable. -- This is a small Unit Test that shows the problem. package netty; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class NettyTest extends TestCase { private final static Logger logger = LoggerFactory.getLogger(NettyTest.class); private final static CamelContext serverContext = new DefaultCamelContext(); private final CamelContext clientContext = new DefaultCamelContext(); private final AtomicInteger responseCounter = new AtomicInteger(0); private final AtomicBoolean passedTen = new AtomicBoolean(false); private Boolean disconnectClient; public NettyTest(Boolean disconnectClient) { this.disconnectClient = disconnectClient; } @Parameters public static Collection<Object[]> configs() { return Arrays.asList(new Object[][] { { true }, { false } }); } @BeforeClass public static void createServer() throws Exception { serverContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false") .setExchangePattern(ExchangePattern.InOut) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Object body = exchange.getIn().getBody(); logger.info("Request received : Value = {}", body); } }) .transform(constant(3)).stop(); } }); serverContext.start(); } @Before public void createClient() throws Exception { clientContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { // Generate an Echo message and ensure a Response is sent from("timer://echoTimer?delay=1s&fixedRate=true&period=1s") .setExchangePattern(ExchangePattern.InOut) .transform() .constant(2) .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString()) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Object body = exchange.getIn().getBody(); logger.info("Response number {} : Value = {}", responseCounter.incrementAndGet(), body); if (responseCounter.get() > 10) { passedTen.set(true); } } }).stop(); } }); } @Test public void test() throws Exception { clientContext.getShutdownStrategy().setTimeout(1); clientContext.start(); logger.info("Disconnect = {}", this.disconnectClient); Thread.sleep(TimeUnit.SECONDS.toMillis(15)); clientContext.stop(); assertTrue("More than 10 responses have been received", passedTen.get()); } } was: Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call. It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?). Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable. > NettyProducer creating new connection on every message > ------------------------------------------------------ > > Key: CAMEL-4556 > URL: https://issues.apache.org/jira/browse/CAMEL-4556 > Project: Camel > Issue Type: Bug > Components: camel-netty > Affects Versions: 2.8.1 > Reporter: Matthew McMahon > Priority: Minor > > Using a NettyProducer without the disconnect=true configuration is causing > the route to block after 10 messages on the to("netty://tcp....") call. > It appears that a new socket connection is created for every message, and > then after 10 connections no new connection is allowed (must be a default > thread pool limit?). > Using the disconnect=true option fixes the problem as a socket is connected, > message sent, then disconnected. But this does not seem viable for > implementations where that overhead is undesirable. > -- > This is a small Unit Test that shows the problem. > package netty; > import java.util.Arrays; > import java.util.Collection; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicInteger; > import junit.framework.TestCase; > import org.apache.camel.CamelContext; > import org.apache.camel.Exchange; > import org.apache.camel.ExchangePattern; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultCamelContext; > import org.junit.Before; > import org.junit.BeforeClass; > import org.junit.Test; > import org.junit.runner.RunWith; > import org.junit.runners.Parameterized; > import org.junit.runners.Parameterized.Parameters; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > @RunWith(Parameterized.class) > public class NettyTest extends TestCase > { > private final static Logger logger = > LoggerFactory.getLogger(NettyTest.class); > private final static CamelContext serverContext = new > DefaultCamelContext(); > private final CamelContext clientContext = new DefaultCamelContext(); > private final AtomicInteger responseCounter = new AtomicInteger(0); > private final AtomicBoolean passedTen = new AtomicBoolean(false); > private Boolean disconnectClient; > public NettyTest(Boolean disconnectClient) > { > this.disconnectClient = disconnectClient; > } > @Parameters > public static Collection<Object[]> configs() > { > return Arrays.asList(new Object[][] { { true }, { false } }); > } > @BeforeClass > public static void createServer() throws Exception > { > serverContext.addRoutes(new RouteBuilder() > { > @Override > public void configure() throws Exception > { > > from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false") > > .setExchangePattern(ExchangePattern.InOut) > .process(new Processor() { > @Override > public void process(Exchange exchange) throws > Exception > { > Object body = exchange.getIn().getBody(); > logger.info("Request received : Value = {}", > body); > } > > }) > .transform(constant(3)).stop(); > } > }); > serverContext.start(); > } > @Before > public void createClient() throws Exception > { > clientContext.addRoutes(new RouteBuilder() > { > @Override > public void configure() throws Exception > { > // Generate an Echo message and ensure a Response is sent > from("timer://echoTimer?delay=1s&fixedRate=true&period=1s") > .setExchangePattern(ExchangePattern.InOut) > .transform() > .constant(2) > .to(ExchangePattern.InOut, > "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" > + disconnectClient.toString()) > .process(new Processor() > { > @Override > public void process(Exchange exchange) throws > Exception > { > Object body = exchange.getIn().getBody(); > logger.info("Response number {} : Value = > {}", > responseCounter.incrementAndGet(), > body); > if (responseCounter.get() > 10) { > passedTen.set(true); > } > } > }).stop(); > } > }); > } > @Test > public void test() throws Exception > { > clientContext.getShutdownStrategy().setTimeout(1); > clientContext.start(); > logger.info("Disconnect = {}", this.disconnectClient); > Thread.sleep(TimeUnit.SECONDS.toMillis(15)); > clientContext.stop(); > assertTrue("More than 10 responses have been received", > passedTen.get()); > } > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira