Author: wtam
Date: Mon Dec 8 22:29:43 2008
New Revision: 724619
URL: http://svn.apache.org/viewvc?rev=724619&view=rev
Log:
Applied patch from Martin Krasser with thanks! [CAMEL-1037] Messages in
Resequencer between 2 JMS queues get stuck
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
(with props)
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(with props)
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Mon Dec 8 22:29:43 2008
@@ -17,7 +17,6 @@
package org.apache.camel.model;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import javax.xml.bind.annotation.XmlAccessType;
@@ -28,12 +27,9 @@
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.Route;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionType;
import org.apache.camel.processor.Aggregator;
@@ -107,34 +103,10 @@
public String getShortName() {
return "aggregator";
}
-
- @SuppressWarnings("unchecked")
- @Override
- public void addRoutes(RouteContext routeContext, Collection<Route> routes)
throws Exception {
- final Aggregator aggregator = createAggregator(routeContext);
- doAddRoute(routeContext, routes, aggregator);
- }
-
- private void doAddRoute(RouteContext routeContext, Collection<Route>
routes, final Aggregator aggregator)
- throws Exception {
- Route route = new Route<Exchange>(aggregator.getEndpoint(),
aggregator) {
- @Override
- public String toString() {
- return "AggregatorRoute[" + getEndpoint() + " -> " +
aggregator.getProcessor() + "]";
- }
- };
-
- routes.add(route);
- }
-
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
- final Aggregator aggregator = createAggregator(routeContext);
-
- doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(),
aggregator);
- routeContext.setIsRouteAdded(true);
- return aggregator;
+ return createAggregator(routeContext);
}
public ExpressionClause<AggregatorType> createAndSetExpression() {
@@ -144,7 +116,6 @@
}
protected Aggregator createAggregator(RouteContext routeContext) throws
Exception {
- Endpoint from = routeContext.getEndpoint();
final Processor processor = routeContext.createProcessor(this);
final Aggregator aggregator;
@@ -163,7 +134,7 @@
AggregationStrategy strategy =
createAggregationStrategy(routeContext);
aggregationCollection.setAggregationStrategy(strategy);
}
- aggregator = new Aggregator(from, processor,
aggregationCollection);
+ aggregator = new Aggregator(processor, aggregationCollection);
} else {
// create the aggregator using a default collection
AggregationStrategy strategy =
createAggregationStrategy(routeContext);
@@ -180,9 +151,9 @@
predicate =
getCompletedPredicate().createPredicate(routeContext);
}
if (predicate != null) {
- aggregator = new Aggregator(from, processor,
aggregateExpression, strategy, predicate);
+ aggregator = new Aggregator(processor, aggregateExpression,
strategy, predicate);
} else {
- aggregator = new Aggregator(from, processor,
aggregateExpression, strategy);
+ aggregator = new Aggregator(processor, aggregateExpression,
strategy);
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Dec 8 22:29:43 2008
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
+
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -59,7 +60,6 @@
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CollectionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -780,9 +780,6 @@
* @return the expression clause to be used as builder to configure the
correlation expression
*/
public ExpressionClause<AggregatorType> aggregate() {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
addOutput(answer);
return answer.createAndSetExpression();
@@ -796,9 +793,6 @@
* @return the expression clause to be used as builder to configure the
correlation expression
*/
public ExpressionClause<AggregatorType> aggregate(AggregationStrategy
aggregationStrategy) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
answer.setAggregationStrategy(aggregationStrategy);
addOutput(answer);
@@ -813,9 +807,6 @@
* @return the builder
*/
public AggregatorType aggregate(AggregationCollection
aggregationCollection) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
answer.setAggregationCollection(aggregationCollection);
addOutput(answer);
@@ -833,9 +824,6 @@
* @return the builder
*/
public AggregatorType aggregate(Expression correlationExpression) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType(correlationExpression);
addOutput(answer);
return answer;
@@ -853,9 +841,6 @@
* @return the builder
*/
public AggregatorType aggregate(Expression correlationExpression,
AggregationStrategy aggregationStrategy) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType(correlationExpression,
aggregationStrategy);
addOutput(answer);
return answer;
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
Mon Dec 8 22:29:43 2008
@@ -24,10 +24,8 @@
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
-import org.apache.camel.Route;
import org.apache.camel.model.config.BatchResequencerConfig;
import org.apache.camel.model.config.StreamResequencerConfig;
import org.apache.camel.model.language.ExpressionType;
@@ -251,8 +249,7 @@
protected Resequencer createBatchResequencer(RouteContext routeContext,
BatchResequencerConfig config) throws Exception {
Processor processor = routeContext.createProcessor(this);
- Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
- processor, resolveExpressionList(routeContext));
+ Resequencer resequencer = new Resequencer(processor,
resolveExpressionList(routeContext));
resequencer.setBatchSize(config.getBatchSize());
resequencer.setBatchTimeout(config.getBatchTimeout());
return resequencer;
@@ -273,36 +270,13 @@
StreamResequencerConfig config) throws Exception {
config.getComparator().setExpressions(resolveExpressionList(routeContext));
Processor processor = routeContext.createProcessor(this);
- StreamResequencer resequencer = new
StreamResequencer(routeContext.getEndpoint(),
- processor, config.getComparator());
+ StreamResequencer resequencer = new StreamResequencer(processor,
config.getComparator());
resequencer.setTimeout(config.getTimeout());
resequencer.setCapacity(config.getCapacity());
return resequencer;
}
- private Route<? extends Exchange> createBatchResequencerRoute(RouteContext
routeContext) throws Exception {
- // TODO: Not used should it be removed?
- final Resequencer resequencer = createBatchResequencer(routeContext,
batchConfig);
- return new Route(routeContext.getEndpoint(), resequencer) {
- @Override
- public String toString() {
- return "BatchResequencerRoute[" + getEndpoint() + " -> " +
resequencer.getProcessor() + "]";
- }
- };
- }
-
- private Route<? extends Exchange>
createStreamResequencerRoute(RouteContext routeContext) throws Exception {
- // TODO: Not used should it be removed?
- final StreamResequencer resequencer =
createStreamResequencer(routeContext, streamConfig);
- return new Route(routeContext.getEndpoint(), resequencer) {
- @Override
- public String toString() {
- return "StreamResequencerRoute[" + getEndpoint() + " -> " +
resequencer.getProcessor() + "]";
- }
- };
- }
-
private List<Expression> resolveExpressionList(RouteContext routeContext) {
if (expressionList == null) {
expressionList = new ArrayList<Expression>();
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Dec 8 22:29:43 2008
@@ -16,7 +16,6 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.Endpoint;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
@@ -45,19 +44,19 @@
public class Aggregator extends BatchProcessor {
private Predicate aggregationCompletedPredicate;
- public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
+ public Aggregator(Processor processor, Expression correlationExpression,
AggregationStrategy aggregationStrategy) {
- this(endpoint, processor, new
DefaultAggregationCollection(correlationExpression, aggregationStrategy));
+ this(processor, new
DefaultAggregationCollection(correlationExpression, aggregationStrategy));
}
- public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
+ public Aggregator(Processor processor, Expression correlationExpression,
AggregationStrategy aggregationStrategy, Predicate
aggregationCompletedPredicate) {
- this(endpoint, processor, new
PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
+ this(processor, new
PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
this.aggregationCompletedPredicate = aggregationCompletedPredicate;
}
- public Aggregator(Endpoint endpoint, Processor processor,
AggregationCollection collection) {
- super(endpoint, processor, collection);
+ public Aggregator(Processor processor, AggregationCollection collection) {
+ super(processor, collection);
}
@Override
@@ -68,7 +67,7 @@
@Override
protected boolean isBatchCompleted(int index) {
if (aggregationCompletedPredicate != null) {
- // TODO: (davsclaus) What is the point with this code? I think its
wrong
+ // TODO: (davsclaus) CAMEL-1159 What is the point with this code?
I think its wrong
if (getCollection().size() > 0) {
return true;
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Dec 8 22:29:43 2008
@@ -18,17 +18,14 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* A base class for any kind of [EMAIL PROTECTED] Processor} which implements
some kind of
@@ -36,24 +33,25 @@
*
* @version $Revision$
*/
-public class BatchProcessor extends ServiceSupport implements Runnable,
Processor {
+public class BatchProcessor extends ServiceSupport implements Processor {
+
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
- private static final transient Log LOG =
LogFactory.getLog(BatchProcessor.class);
- private Endpoint endpoint;
- private Processor processor;
- private Collection<Exchange> collection;
private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
private int batchSize = DEFAULT_BATCH_SIZE;
private int outBatchSize;
- private PollingConsumer consumer;
+
+ private Processor processor;
+ private Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
- public BatchProcessor(Endpoint endpoint, Processor processor,
Collection<Exchange> collection) {
- this.endpoint = endpoint;
+ private BatchSender sender;
+
+ public BatchProcessor(Processor processor, Collection<Exchange>
collection) {
this.processor = processor;
this.collection = collection;
+ this.sender = new BatchSender();
}
@Override
@@ -61,18 +59,6 @@
return "BatchProcessor[to: " + processor + "]";
}
- public void run() {
- LOG.debug("Starting thread for " + this);
- while (isRunAllowed()) {
- try {
- processBatch();
- } catch (Exception e) {
- getExceptionHandler().handleException(e);
- }
- }
- collection.clear();
- }
-
// Properties
//
-------------------------------------------------------------------------
public ExceptionHandler getExceptionHandler() {
@@ -123,62 +109,20 @@
this.batchTimeout = batchTimeout;
}
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
public Processor getProcessor() {
return processor;
}
/**
- * A transactional method to process a batch of messages up to a timeout
- * period or number of messages reached.
- */
- protected synchronized void processBatch() throws Exception {
- long start = System.currentTimeMillis();
- long end = start + batchTimeout;
- for (int i = 0; !isBatchCompleted(i); i++) {
- long timeout = end - System.currentTimeMillis();
- if (timeout < 0L) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("batch timeout expired at batch index: " + i);
- }
- break;
- }
- Exchange exchange = consumer.receive(timeout);
- if (exchange == null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("receive with timeout: " + timeout + " expired
at batch index: " + i);
- }
- break;
- }
- collection.add(exchange);
- }
-
- // we should NOT log the collection directly as it will invoke a
toString() on collection
- // and it will call collection.iterator() where end-users might do
stuff that would break
- // calling the iterator a 2nd time as below
-
- // lets send the batch
- Iterator<Exchange> iter = collection.iterator();
- while (iter.hasNext()) {
- Exchange exchange = iter.next();
- iter.remove();
- processExchange(exchange);
- }
- }
-
- /**
* A strategy method to decide if the batch is completed the resulting
exchanges should be sent
*/
- protected boolean isBatchCompleted(int index) {
+ protected boolean isBatchCompleted(int num) {
// out batch size is optional and we should only check it if its
enabled (= >0)
if (outBatchSize > 0 && collection.size() >= outBatchSize) {
return true;
}
// fallback to regular batch size check
- return index >= batchSize;
+ return num >= batchSize;
}
/**
@@ -191,16 +135,13 @@
}
protected void doStart() throws Exception {
- consumer = endpoint.createPollingConsumer();
-
- ServiceHelper.startServices(processor, consumer);
-
- Thread thread = new Thread(this, this + " Polling Thread");
- thread.start();
+ ServiceHelper.startServices(processor);
+ sender.start();
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumer, processor);
+ sender.cancel();
+ ServiceHelper.stopServices(processor);
collection.clear();
}
@@ -208,7 +149,71 @@
return collection;
}
+ /**
+ * Enqueues an exchange for later batch processing.
+ */
public void process(Exchange exchange) throws Exception {
- // empty since exchanges come from endpoint's polling consumer
+ sender.enqueueExchange(exchange);
+ }
+
+ /**
+ * Sender thread for queued-up exchanges.
+ */
+ private class BatchSender extends Thread {
+
+ private volatile boolean cancelRequested;
+
+ private LinkedBlockingQueue<Exchange> queue;
+
+ public BatchSender() {
+ super("Batch Sender");
+ this.queue = new LinkedBlockingQueue<Exchange>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(batchTimeout);
+ } catch (InterruptedException e) {
+ if (cancelRequested) {
+ return;
+ }
+ }
+ try {
+ sendExchanges();
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ }
+ }
+ }
+
+ public void cancel() {
+ cancelRequested = true;
+ interrupt();
+ }
+
+ public void sendBatch() {
+ interrupt();
+ }
+
+ public void enqueueExchange(Exchange exchange) {
+ queue.add(exchange);
+ if (isBatchCompleted(queue.size())) {
+ sendBatch();
+ }
+ }
+
+ private void sendExchanges() throws Exception {
+ queue.drainTo(collection, batchSize);
+ Iterator<Exchange> iter = collection.iterator();
+ while (iter.hasNext()) {
+ Exchange exchange = iter.next();
+ iter.remove();
+ processExchange(exchange);
+ }
+ }
+
}
+
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
Mon Dec 8 22:29:43 2008
@@ -21,7 +21,6 @@
import java.util.Set;
import java.util.TreeSet;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -35,16 +34,16 @@
* @version $Revision$
*/
public class Resequencer extends BatchProcessor {
- public Resequencer(Endpoint endpoint, Processor processor, Expression
expression) {
- this(endpoint, processor, createSet(expression));
+ public Resequencer(Processor processor, Expression expression) {
+ this(processor, createSet(expression));
}
- public Resequencer(Endpoint endpoint, Processor processor,
List<Expression> expressions) {
- this(endpoint, processor, createSet(expressions));
+ public Resequencer(Processor processor, List<Expression> expressions) {
+ this(processor, createSet(expressions));
}
- public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange>
collection) {
- super(endpoint, processor, collection);
+ public Resequencer(Processor processor, Set<Exchange> collection) {
+ super(processor, collection);
}
@Override
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Mon Dec 8 22:29:43 2008
@@ -16,9 +16,7 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
@@ -52,14 +50,14 @@
*
* @see ResequencerEngine
*/
-public class StreamResequencer extends ServiceSupport implements
SequenceSender<Exchange>, Runnable, Processor {
+public class StreamResequencer extends ServiceSupport implements
SequenceSender<Exchange>, Processor {
+ private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L;
+
private ExceptionHandler exceptionHandler;
private ResequencerEngine<Exchange> engine;
- private PollingConsumer consumer;
- private Endpoint endpoint;
private Processor processor;
- private Thread worker;
+ private Delivery delivery;
private int capacity;
/**
@@ -72,11 +70,10 @@
* @param comparator
* a sequence element comparator for exchanges.
*/
- public StreamResequencer(Endpoint endpoint, Processor processor,
SequenceElementComparator<Exchange> comparator) {
+ public StreamResequencer(Processor processor,
SequenceElementComparator<Exchange> comparator) {
this.exceptionHandler = new LoggingExceptionHandler(getClass());
this.engine = new ResequencerEngine<Exchange>(comparator);
this.engine.setSequenceSender(this);
- this.endpoint = endpoint;
this.processor = processor;
}
@@ -139,11 +136,10 @@
@Override
protected void doStart() throws Exception {
- consumer = endpoint.createPollingConsumer();
- ServiceHelper.startServices(processor, consumer);
- worker = new Thread(this, this + " Polling Thread");
+ ServiceHelper.startServices(processor);
+ delivery = new Delivery();
engine.start();
- worker.start();
+ delivery.start();
}
@Override
@@ -151,7 +147,7 @@
// let's stop everything in the reverse order
// no need to stop the worker thread -- it will stop automatically
when this service is stopped
engine.stop();
- ServiceHelper.stopServices(consumer, processor);
+ ServiceHelper.stopServices(processor);
}
/**
@@ -164,43 +160,49 @@
processor.process(o);
}
- /**
- * Loops over [EMAIL PROTECTED] #processExchange()}.
- */
- public void run() {
- while (!isStopped() && !isStopping()) {
- try {
- processExchange();
- } catch (Exception e) {
- exceptionHandler.handleException(e);
- }
+ public void process(Exchange exchange) throws Exception {
+ while (engine.size() >= capacity) {
+ Thread.sleep(getTimeout());
}
+ engine.insert(exchange);
+ delivery.request();
}
- /**
- * Processes an exchange received from the this resequencer's
- * <code>endpoint</code>. Received exchanges are processed via
- * [EMAIL PROTECTED] ResequencerEngine#insert(Object)}.
- * [EMAIL PROTECTED] ResequencerEngine#deliver()} is then called in any
case regardless
- * whether a message was received or receiving timed out.
- *
- * @throws Exception
- * if exchange delivery fails.
- */
- protected void processExchange() throws Exception {
- if (engine.size() >= capacity) {
- Thread.sleep(getTimeout());
- } else {
- Exchange exchange = consumer.receive(getTimeout());
- if (exchange != null) {
- engine.insert(exchange);
+ private class Delivery extends Thread {
+
+ private volatile boolean cancelRequested;
+
+ public Delivery() {
+ super("Delivery Thread");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
+ } catch (InterruptedException e) {
+ if (cancelRequested) {
+ return;
+ }
+ }
+ try {
+ engine.deliver();
+ } catch (Exception e) {
+ exceptionHandler.handleException(e);
+ }
}
}
- engine.deliver();
- }
- public void process(Exchange exchange) throws Exception {
- engine.insert(exchange);
+ public void cancel() {
+ cancelRequested = true;
+ interrupt();
+ }
+
+ public void request() {
+ interrupt();
+ }
+
}
-
+
}
\ No newline at end of file
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Dec 8 22:29:43 2008
@@ -17,8 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -84,19 +82,13 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- // disabled due CAMEL-393
- //from("seda:header").setHeader("visited",
constant(true)).aggregator(header("cheese")).to("mock:result");
// START SNIPPET: ex
// in this route we aggregate all from direct:state based on
the header id cheese
from("direct:start").aggregate(header("cheese")).to("mock:result");
- // because of a bug in Camel (CAMEL-393) we can not have other
types between from and aggregator
- // so we must do it as here with two routes. In the fist line
we set the header visited to true
- // and link it to the 2nd route by sending it to direct:temp...
- from("seda:header").setHeader("visited",
constant(true)).to("direct:temp");
- // and here we consume from direct:temp to continue from above
and aggregate
-
from("direct:temp").aggregate(header("cheese")).to("mock:result");
+ // CAMEL-393 now fixed
+ from("seda:header").setHeader("visited",
constant(true)).aggregate(header("cheese")).to("mock:result");
// in this sample we aggregate using our own startegy with a
completion predicate
// stating that the aggregated header is equal to 5.
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java?rev=724619&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
Mon Dec 8 22:29:43 2008
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import
org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
+
+/**
+ * Unit test for issues CAMEL-1034 and CAMEL-1037
+ */
[EMAIL PROTECTED]
+public class JmsResequencerTest extends AbstractJUnit38SpringContextTests {
+
+ @Autowired
+ protected CamelContext context;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint result;
+
+ protected ProducerTemplate template;
+
+ public void setUp() {
+ template = context.createProducerTemplate();
+ }
+
+ public void tearDown() {
+ result.reset();
+ }
+
+ public void testBatchResequencer() throws Exception {
+ testResequencer("activemq:queue:in1");
+ }
+
+ public void testStreamResequencer() throws Exception {
+ testResequencer("activemq:queue:in2");
+ }
+
+ private void testResequencer(String endpoint) throws Exception {
+ result.expectedMessageCount(100);
+ for (int i = 0; i < 100; i++) {
+ result.message(i).body().isEqualTo(Integer.valueOf(i + 1));
+ }
+ for (int i = 100; i > 0; i--) {
+ template.sendBodyAndHeader(endpoint, Integer.valueOf(i), "num",
Long.valueOf(i));
+ }
+ result.assertIsSatisfied();
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java?rev=724619&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
Mon Dec 8 22:29:43 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * RouteBuilder for [EMAIL PROTECTED] JmsResequencerTest}.
+ */
+public class JmsResequencerTestRouteBuilder extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
from("activemq:queue:in1").resequence().body().batch().size(100).timeout(10000L).to("mock:result");
+
from("activemq:queue:in2").resequence().header("num").stream().timeout(2000L).to("mock:result");
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml?rev=724619&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
(added)
+++
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
Mon Dec 8 22:29:43 2008
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+ <camelContext id="camel"
xmlns="http://activemq.apache.org/camel/schema/spring">
+ <jmxAgent id="agent" disabled="true" />
+ </camelContext>
+
+ <bean id="routeBuilder"
+
class="org.apache.camel.component.jms.issues.JmsResequencerTestRouteBuilder">
+ </bean>
+
+ <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL"
value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
+ </bean>
+
+ <bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
+ <property name="connectionFactory" ref="jmsConnectionFactory"/>
+ <property name="transacted" value="false"/>
+ <property name="concurrentConsumers" value="10"/>
+ </bean>
+
+ <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
+ <property name="configuration" ref="jmsConfig"/>
+ </bean>
+
+</beans>
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml