Author: davsclaus
Date: Fri Oct 16 10:53:31 2009
New Revision: 825839
URL: http://svn.apache.org/viewvc?rev=825839&view=rev
Log:
CAMEL-2069: Reworked patch a bit so camel-spring can pass unit testing.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java
- copied, changed from r825755,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java
(from r825755,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java&r1=825755&r2=825839&rev=825839&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java
Fri Oct 16 10:53:31 2009
@@ -25,7 +25,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
@@ -34,43 +33,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class MulticastAnotherAggregatorTest extends ContextTestSupport {
+public class MulticastAggregatorRejectedPutBackTest extends
MulticastAnotherAggregatorTest {
- private static final Log LOG =
LogFactory.getLog(MulticastAnotherAggregatorTest.class);
-
- public void testMulticastReceivesItsOwnExchangeParallelly() throws
Exception {
- sendingAMessageUsingMulticastReceivesItsOwnExchange(true);
- }
-
- public void testMulticastReceivesItsOwnExchangeSequentially() throws
Exception {
- sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
- }
-
- private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean
isParallel) throws Exception {
- MockEndpoint result = getMockEndpoint("mock:result");
- result.expectedBodiesReceived("inputx+inputy+inputz");
-
- String url;
- if (isParallel) {
- url = "direct:parallel";
- } else {
- url = "direct:sequential";
- }
-
- // use InOut
- Exchange exchange = template.request(url, new Processor() {
- public void process(Exchange exchange) {
- Message in = exchange.getIn();
- in.setBody("input");
- in.setHeader("foo", "bar");
- }
- });
-
- assertNotNull("We should get result here", exchange);
- assertEquals("Can't get the right result", "inputx+inputy+inputz",
exchange.getOut().getBody(String.class));
-
- assertMockEndpointsSatisfied();
- }
+ private static final Log LOG =
LogFactory.getLog(MulticastAggregatorRejectedPutBackTest.class);
public void testMulticastLoadParallelly() throws Exception {
sendLoad(true);
@@ -85,10 +50,10 @@
final int numThreads = 10;
final AtomicLong total = new AtomicLong(0);
final String url = isParallel ? "direct:parallel" :
"direct:sequential";
-
+
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(numThreads * numMsgs);
-
+
Runnable runner = new Runnable() {
public void run() {
try {
@@ -111,11 +76,11 @@
}
};
ExecutorService executor = Executors.newCachedThreadPool();
-
+
for (int count = 0; count < numThreads; count++) {
executor.execute(runner);
}
-
+
executor.shutdown();
while (!executor.isTerminated()) {
executor.awaitTermination(10, TimeUnit.SECONDS);
@@ -123,7 +88,7 @@
assertMockEndpointsSatisfied();
}
-
+
private class WorkQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor
executor) {
try {
@@ -134,15 +99,14 @@
}
}
}
-
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 10,
0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
tpExecutor.setRejectedExecutionHandler(new WorkQueuePolicy());
-
-
+
// START SNIPPET: example
// The message will be sent parallelly to the endpoints
from("direct:parallel")
@@ -161,4 +125,4 @@
}
};
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java?rev=825839&r1=825838&r2=825839&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
Fri Oct 16 10:53:31 2009
@@ -17,13 +17,8 @@
package org.apache.camel.processor;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -72,77 +67,12 @@
assertMockEndpointsSatisfied();
}
- public void testMulticastLoadParallelly() throws Exception {
- sendLoad(true);
- }
-
- public void testMulticastLoadSequentially() throws Exception {
- sendLoad(false);
- }
-
- public void sendLoad(boolean isParallel) throws Exception {
- final int numMsgs = 10;
- final int numThreads = 10;
- final AtomicLong total = new AtomicLong(0);
- final String url = isParallel ? "direct:parallel" :
"direct:sequential";
-
- MockEndpoint result = getMockEndpoint("mock:result");
- result.expectedMessageCount(numThreads * numMsgs);
-
- Runnable runner = new Runnable() {
- public void run() {
- try {
- Processor processor = new Processor() {
- public void process(Exchange exchange) {
- Message in = exchange.getIn();
- in.setBody("input");
- in.setHeader("foo", "bar");
- }
- };
-
- for (int count = 0; count < numMsgs; count++) {
- template.request(url, processor);
- }
-
- LOG.debug("Runner completed: " + total.incrementAndGet());
- } catch (Exception e) {
- fail(e.getMessage());
- }
- }
- };
- ExecutorService executor = Executors.newCachedThreadPool();
-
- for (int count = 0; count < numThreads; count++) {
- executor.execute(runner);
- }
-
- executor.shutdown();
- while (!executor.isTerminated()) {
- executor.awaitTermination(10, TimeUnit.SECONDS);
- }
-
- assertMockEndpointsSatisfied();
- }
-
- private class WorkQueuePolicy implements RejectedExecutionHandler {
- public void rejectedExecution(Runnable runnable, ThreadPoolExecutor
executor) {
- try {
- executor.getQueue().put(runnable);
- } catch (InterruptedException e) {
- // should not happen
- throw new RejectedExecutionException(e);
- }
- }
- }
-
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 10,
0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
- tpExecutor.setRejectedExecutionHandler(new WorkQueuePolicy());
-
-
+ ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 1,
0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
+
// START SNIPPET: example
// The message will be sent parallelly to the endpoints
from("direct:parallel")