Updated Branches: refs/heads/camel-2.10.x 225a69eba -> b405f5fd8 refs/heads/camel-2.11.x c67a30b52 -> 1416ec7e8 refs/heads/master e1be27def -> ceb28aea5
CAMEL-6954: camel-mina2 - UDP protocol should use worker pool as well. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ceb28aea Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ceb28aea Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ceb28aea Branch: refs/heads/master Commit: ceb28aea513b3fb0291700c545efd0b1d22fdf89 Parents: e1be27d Author: Claus Ibsen <[email protected]> Authored: Tue Nov 12 09:32:52 2013 +0100 Committer: Claus Ibsen <[email protected]> Committed: Tue Nov 12 09:32:52 2013 +0100 ---------------------------------------------------------------------- .../camel/component/mina2/Mina2Consumer.java | 7 +- .../component/mina2/Mina2UdpConcurrentTest.java | 75 ++++++++++++++++++++ .../camel/component/mina2/Mina2UdpTest.java | 1 - 3 files changed, 81 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ceb28aea/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java index 49c1dc5..2d6e76a 100644 --- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java +++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java @@ -193,7 +193,6 @@ public class Mina2Consumer extends DefaultConsumer { addCodecFactory(service, codecFactory); LOG.debug("{}: Using ObjectSerializationCodecFactory: {}", type, codecFactory); } - } protected void setupDatagramProtocol(String uri, Mina2Configuration configuration) { @@ -207,6 +206,12 @@ public class Mina2Consumer extends DefaultConsumer { configureDataGramCodecFactory("MinaConsumer", acceptor, configuration); acceptor.setCloseOnDeactivation(true); // reuse address is default true for datagram + if (configuration.isOrderedThreadPoolExecutor()) { + workerPool = new OrderedThreadPoolExecutor(configuration.getMaximumPoolSize()); + } else { + workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize()); + } + acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool)); if (minaLogger) { acceptor.getFilterChain().addLast("logger", new LoggingFilter()); } http://git-wip-us.apache.org/repos/asf/camel/blob/ceb28aea/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpConcurrentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpConcurrentTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpConcurrentTest.java new file mode 100644 index 0000000..c1bdf63 --- /dev/null +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpConcurrentTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mina2; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +/** + * @version + */ +public class Mina2UdpConcurrentTest extends BaseMina2Test { + + protected int messageCount = 3; + + public Mina2UdpConcurrentTest() { + } + + @Test + public void testMinaRoute() throws Exception { + MockEndpoint endpoint = getMockEndpoint("mock:result"); + endpoint.expectedBodiesReceivedInAnyOrder("Hello Message: 0", "Hello Message: 1", "Hello Message: 2"); + + sendUdpMessages(); + + assertMockEndpointsSatisfied(); + } + + protected void sendUdpMessages() throws Exception { + DatagramSocket socket = new DatagramSocket(); + try { + InetAddress address = InetAddress.getByName("127.0.0.1"); + for (int i = 0; i < messageCount; i++) { + String text = "Hello Message: " + Integer.toString(i); + byte[] data = text.getBytes(); + + //DatagramPacket packet = new DatagramPacket(data, data.length, address, getPort()); + DatagramPacket packet = new DatagramPacket(data, data.length, address, 10111); + socket.send(packet); + } + Thread.sleep(2000); + } finally { + socket.close(); + } + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + // we use un-ordered to allow processing the UDP messages in any order from same client + from("mina2:udp://127.0.0.1:10111?sync=false&minaLogger=true&orderedThreadPoolExecutor=false") + .delay(1000) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ceb28aea/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java index 0aed0d6..519fb35 100644 --- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java +++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java @@ -64,7 +64,6 @@ public class Mina2UdpTest extends BaseMina2Test { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { - public void configure() { from("mina2:udp://127.0.0.1:10111?sync=false&minaLogger=true").to("mock:result"); }
