Repository: camel Updated Branches: refs/heads/master 59939fddd -> 192a3b2bc
CAMEL-9667: Added unit test. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/df092872 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/df092872 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/df092872 Branch: refs/heads/master Commit: df092872f124a9682ac15e65d3920021035cd7d9 Parents: 59939fd Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Mar 4 15:38:01 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Mar 4 15:38:01 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/processor/BatchProcessor.java | 11 +-- .../resequencer/ResequencerBatchOrderTest.java | 73 ++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/df092872/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java index f7056e0..f6059ba 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -335,7 +336,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na private Queue<Exchange> queue; private Lock queueLock = new ReentrantLock(); - private boolean exchangeEnqueued; + private final AtomicBoolean exchangeEnqueued = new AtomicBoolean(); private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>(); private Condition exchangeEnqueuedCondition = queueLock.newCondition(); @@ -372,7 +373,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na try { do { try { - if (!exchangeEnqueued) { + if (!exchangeEnqueued.get()) { LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after {} ms.", batchTimeout); exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); } @@ -383,7 +384,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na id = completionPredicateMatched.poll(); } - if (id != null || !exchangeEnqueued) { + if (id != null || !exchangeEnqueued.get()) { if (id != null) { LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate"); } else { @@ -391,7 +392,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na } drainQueueTo(collection, batchSize, id); } else { - exchangeEnqueued = false; + exchangeEnqueued.set(false); boolean drained = false; while (isInBatchCompleted(queue.size())) { drained = true; @@ -471,7 +472,7 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na } } queue.add(exchange); - exchangeEnqueued = true; + exchangeEnqueued.set(true); exchangeEnqueuedCondition.signal(); } finally { queueLock.unlock(); http://git-wip-us.apache.org/repos/asf/camel/blob/df092872/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java b/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java new file mode 100644 index 0000000..0bfc82d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerBatchOrderTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.resequencer; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResequencerBatchOrderTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ResequencerBatchOrderTest.class); + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .resequence(body()).batch().size(2).timeout(3000) + .to("mock:result"); + } + }; + } + + public void testResequencerBatch() throws Exception { + for (int i = 0; i < 100; i++) { + testIteration(i); + } + } + + private void testIteration(int i) throws Exception { + MockEndpoint me = context.getEndpoint("mock:result", MockEndpoint.class); + me.reset(); + me.expectedMessageCount(4); + + LOG.info("Run #{}", i); + + template.sendBody("direct:start", "4"); + template.sendBody("direct:start", "1"); + + template.sendBody("direct:start", "3"); + template.sendBody("direct:start", "2"); + + assertMockEndpointsSatisfied(); + + // because the order can change a bit depending when the resequencer trigger cut-off + // then the order can be a bit different + + String a = me.getExchanges().get(0).getIn().getBody(String.class); + String b = me.getExchanges().get(1).getIn().getBody(String.class); + String c = me.getExchanges().get(2).getIn().getBody(String.class); + String d = me.getExchanges().get(3).getIn().getBody(String.class); + String line = a + b + c + d; + + LOG.info("Order: {}", line); + + assertTrue("Line was " + line, "1423".equals(line) || "1234".equals(line)); + } +} \ No newline at end of file