Author: davsclaus
Date: Wed Nov 4 17:17:40 2009
New Revision: 832801
URL: http://svn.apache.org/viewvc?rev=832801&view=rev
Log:
CAMEL-2134: Applied patch with thanks to David Valeri.
Added:
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
(with props)
Modified:
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Modified:
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=832801&r1=832800&r2=832801&view=diff
==============================================================================
---
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Wed Nov 4 17:17:40 2009
@@ -20,7 +20,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@@ -48,6 +48,8 @@
* @version $Revision$
*/
public class MulticastProcessor extends ServiceSupport implements Processor {
+ private static final int DEFAULT_THREADPOOL_SIZE = 10;
+
static class ProcessorExchangePair {
private final Processor processor;
private final Exchange exchange;
@@ -95,7 +97,10 @@
this.executor = executor;
} else {
// setup default Executor
- this.executor = new ThreadPoolExecutor(processors.size(),
processors.size(), 0, TimeUnit.MILLISECONDS, new
ArrayBlockingQueue<Runnable>(processors.size()));
+ this.executor = new ThreadPoolExecutor(
+ DEFAULT_THREADPOOL_SIZE, DEFAULT_THREADPOOL_SIZE,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
}
}
this.streaming = streaming;
Added:
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java?rev=832801&view=auto
==============================================================================
---
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
(added)
+++
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
Wed Nov 4 17:17:40 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastExecutorTest extends ContextTestSupport {
+
+ protected static final String DEFAULT_EXECUTOR_ENDPOINT =
"seda:inputDefaultExecutor";
+
+ protected Endpoint<Exchange> startEndpoint;
+ protected MockEndpoint x;
+ protected MockEndpoint y;
+ protected MockEndpoint z;
+
+ public void testSendingAMessageUsingMulticastAdequateExecutorPool() throws
Exception {
+ this.x.expectedBodiesReceived("input");
+ this.x.expectedMessageCount(40);
+ this.y.expectedBodiesReceived("input");
+ this.y.expectedMessageCount(40);
+ this.z.expectedBodiesReceived("input");
+ this.z.expectedMessageCount(40);
+
+ (new Thread(new Runnable() {
+ public void run() {
+ for (int i = 0; i < 40; i++) {
+ template.sendBody(DEFAULT_EXECUTOR_ENDPOINT, "input");
+ }
+ }
+ })).start();
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ this.x = getMockEndpoint("mock:x");
+ this.y = getMockEndpoint("mock:y");
+ this.z = getMockEndpoint("mock:z");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: example
+ from(DEFAULT_EXECUTOR_ENDPOINT + "?concurrentConsumers=5").
+ multicast().
+ parallelProcessing().
+ to("mock:x", "mock:y").
+ to("mock:z");
+ // END SNIPPET: example
+ }
+ };
+ }
+}
Propchange:
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/MulticastExecutorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date