Author: wtam
Date: Mon Dec 8 23:18:06 2008
New Revision: 724629
URL: http://svn.apache.org/viewvc?rev=724629&view=rev
Log:
Merged revisions 724619 via svnmerge from
https://svn.apache.org/repos/asf/activemq/camel/trunk
........
r724619 | wtam | 2008-12-09 01:29:43 -0500 (Tue, 09 Dec 2008) | 1 line
Applied patch from Martin Krasser with thanks! [CAMEL-1037] Messages in
Resequencer between 2 JMS queues get stuck
........
Added:
activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
- copied unchanged from r724619,
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
- copied, changed from r724619,
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
activemq/camel/branches/camel-1.x/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
- copied unchanged from r724619,
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
Modified:
activemq/camel/branches/camel-1.x/ (props changed)
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 8 23:18:06 2008
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Mon Dec 8 23:18:06 2008
@@ -16,7 +16,6 @@
*/
package org.apache.camel.model;
-import java.util.Collection;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -25,12 +24,9 @@
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.Route;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionType;
import org.apache.camel.processor.Aggregator;
@@ -89,37 +85,12 @@
public String getShortName() {
return "aggregator";
}
-
- @SuppressWarnings("unchecked")
- @Override
- public void addRoutes(RouteContext routeContext, Collection<Route> routes)
throws Exception {
- final Aggregator aggregator = createAggregator(routeContext);
- doAddRoute(routeContext, routes, aggregator);
- }
-
- private void doAddRoute(RouteContext routeContext, Collection<Route>
routes, final Aggregator aggregator)
- throws Exception {
- Route route = new Route<Exchange>(aggregator.getEndpoint(),
aggregator) {
- @Override
- public String toString() {
- return "AggregatorRoute[" + getEndpoint() + " -> " +
aggregator.getProcessor() + "]";
- }
- };
-
- routes.add(route);
- }
-
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
- final Aggregator aggregator = createAggregator(routeContext);
-
- doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(),
aggregator);
- routeContext.setIsRouteAdded(true);
- return aggregator;
+ return createAggregator(routeContext);
}
protected Aggregator createAggregator(RouteContext routeContext) throws
Exception {
- Endpoint from = routeContext.getEndpoint();
final Processor processor = routeContext.createProcessor(this);
final Aggregator aggregator;
@@ -138,7 +109,7 @@
AggregationStrategy strategy =
createAggregationStrategy(routeContext);
aggregationCollection.setAggregationStrategy(strategy);
}
- aggregator = new Aggregator(from, processor,
aggregationCollection);
+ aggregator = new Aggregator(processor, aggregationCollection);
} else {
// create the aggregator using a default collection
AggregationStrategy strategy =
createAggregationStrategy(routeContext);
@@ -150,9 +121,9 @@
predicate =
getCompletedPredicate().createPredicate(routeContext);
}
if (predicate != null) {
- aggregator = new Aggregator(from, processor,
aggregateExpression, strategy, predicate);
+ aggregator = new Aggregator(processor, aggregateExpression,
strategy, predicate);
} else {
- aggregator = new Aggregator(from, processor,
aggregateExpression, strategy);
+ aggregator = new Aggregator(processor, aggregateExpression,
strategy);
}
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Dec 8 23:18:06 2008
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
+
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -62,7 +63,6 @@
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CollectionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -755,9 +755,6 @@
* together into a single invoice message.
*/
public ExpressionClause<AggregatorType> aggregator() {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
addOutput(answer);
return ExpressionClause.createAndSetExpression(answer);
@@ -781,9 +778,6 @@
* @param aggregationStrategy the strategy used for the aggregation
*/
public ExpressionClause<AggregatorType> aggregator(AggregationStrategy
aggregationStrategy) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
answer.setAggregationStrategy(aggregationStrategy);
addOutput(answer);
@@ -800,9 +794,6 @@
* @param aggregationCollection the collection used to perform the
aggregation
*/
public AggregatorType aggregator(AggregationCollection
aggregationCollection) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType();
answer.setAggregationCollection(aggregationCollection);
addOutput(answer);
@@ -830,9 +821,6 @@
* <code>header("JMSCorrelationID")</code>
*/
public AggregatorType aggregator(Expression correlationExpression) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType(correlationExpression);
addOutput(answer);
return answer;
@@ -859,9 +847,6 @@
* <code>header("JMSCorrelationID")</code>
*/
public AggregatorType aggregator(Expression correlationExpression,
AggregationStrategy aggregationStrategy) {
- if (CollectionHelper.filterList(getOutputs(),
ExceptionType.class).isEmpty()) {
- LOG.warn("Aggregator must be the only output added to the route: "
+ this);
- }
AggregatorType answer = new AggregatorType(correlationExpression,
aggregationStrategy);
addOutput(answer);
return answer;
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
Mon Dec 8 23:18:06 2008
@@ -28,7 +28,6 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
-import org.apache.camel.Route;
import org.apache.camel.model.config.BatchResequencerConfig;
import org.apache.camel.model.config.StreamResequencerConfig;
import org.apache.camel.model.language.ExpressionType;
@@ -223,8 +222,7 @@
protected Resequencer createBatchResequencer(RouteContext routeContext,
BatchResequencerConfig config) throws Exception {
Processor processor = routeContext.createProcessor(this);
- Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
- processor, resolveExpressionList(routeContext));
+ Resequencer resequencer = new Resequencer(processor,
resolveExpressionList(routeContext));
resequencer.setBatchSize(config.getBatchSize());
resequencer.setBatchTimeout(config.getBatchTimeout());
return resequencer;
@@ -245,34 +243,13 @@
StreamResequencerConfig config) throws Exception {
config.getComparator().setExpressions(resolveExpressionList(routeContext));
Processor processor = routeContext.createProcessor(this);
- StreamResequencer resequencer = new
StreamResequencer(routeContext.getEndpoint(),
- processor, config.getComparator());
+ StreamResequencer resequencer = new StreamResequencer(processor,
config.getComparator());
resequencer.setTimeout(config.getTimeout());
resequencer.setCapacity(config.getCapacity());
return resequencer;
}
- private Route<? extends Exchange> createBatchResequencerRoute(RouteContext
routeContext) throws Exception {
- final Resequencer resequencer = createBatchResequencer(routeContext,
batchConfig);
- return new Route(routeContext.getEndpoint(), resequencer) {
- @Override
- public String toString() {
- return "BatchResequencerRoute[" + getEndpoint() + " -> " +
resequencer.getProcessor() + "]";
- }
- };
- }
-
- private Route<? extends Exchange>
createStreamResequencerRoute(RouteContext routeContext) throws Exception {
- final StreamResequencer resequencer =
createStreamResequencer(routeContext, streamConfig);
- return new Route(routeContext.getEndpoint(), resequencer) {
- @Override
- public String toString() {
- return "StreamResequencerRoute[" + getEndpoint() + " -> " +
resequencer.getProcessor() + "]";
- }
- };
- }
-
private List<Expression> resolveExpressionList(RouteContext routeContext) {
if (expressionList == null) {
expressionList = new ArrayList<Expression>();
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Dec 8 23:18:06 2008
@@ -16,7 +16,6 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.Endpoint;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
@@ -45,19 +44,19 @@
public class Aggregator extends BatchProcessor {
private Predicate aggregationCompletedPredicate;
- public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
+ public Aggregator(Processor processor, Expression correlationExpression,
AggregationStrategy aggregationStrategy) {
- this(endpoint, processor, new
DefaultAggregationCollection(correlationExpression, aggregationStrategy));
+ this(processor, new
DefaultAggregationCollection(correlationExpression, aggregationStrategy));
}
- public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
+ public Aggregator(Processor processor, Expression correlationExpression,
AggregationStrategy aggregationStrategy, Predicate
aggregationCompletedPredicate) {
- this(endpoint, processor, new
PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
+ this(processor, new
PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
this.aggregationCompletedPredicate = aggregationCompletedPredicate;
}
- public Aggregator(Endpoint endpoint, Processor processor,
AggregationCollection collection) {
- super(endpoint, processor, collection);
+ public Aggregator(Processor processor, AggregationCollection collection) {
+ super(processor, collection);
}
@Override
@@ -68,7 +67,7 @@
@Override
protected boolean isBatchCompleted(int index) {
if (aggregationCompletedPredicate != null) {
- // TODO: (davsclaus) What is the point with this code? I think its
wrong
+ // TODO: (davsclaus) CAMEL-1159 What is the point with this code?
I think its wrong
if (getCollection().size() > 0) {
return true;
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Dec 8 23:18:06 2008
@@ -18,17 +18,14 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* A base class for any kind of [EMAIL PROTECTED] Processor} which implements
some kind of
@@ -36,24 +33,25 @@
*
* @version $Revision$
*/
-public class BatchProcessor extends ServiceSupport implements Runnable,
Processor {
+public class BatchProcessor extends ServiceSupport implements Processor {
+
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
- private static final transient Log LOG =
LogFactory.getLog(BatchProcessor.class);
- private Endpoint endpoint;
- private Processor processor;
- private Collection<Exchange> collection;
private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
private int batchSize = DEFAULT_BATCH_SIZE;
private int outBatchSize;
- private PollingConsumer consumer;
+
+ private Processor processor;
+ private Collection<Exchange> collection;
private ExceptionHandler exceptionHandler;
- public BatchProcessor(Endpoint endpoint, Processor processor,
Collection<Exchange> collection) {
- this.endpoint = endpoint;
+ private BatchSender sender;
+
+ public BatchProcessor(Processor processor, Collection<Exchange>
collection) {
this.processor = processor;
this.collection = collection;
+ this.sender = new BatchSender();
}
@Override
@@ -61,18 +59,6 @@
return "BatchProcessor[to: " + processor + "]";
}
- public void run() {
- LOG.debug("Starting thread for " + this);
- while (isRunAllowed()) {
- try {
- processBatch();
- } catch (Exception e) {
- getExceptionHandler().handleException(e);
- }
- }
- collection.clear();
- }
-
// Properties
//
-------------------------------------------------------------------------
public ExceptionHandler getExceptionHandler() {
@@ -123,62 +109,20 @@
this.batchTimeout = batchTimeout;
}
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
public Processor getProcessor() {
return processor;
}
/**
- * A transactional method to process a batch of messages up to a timeout
- * period or number of messages reached.
- */
- protected synchronized void processBatch() throws Exception {
- long start = System.currentTimeMillis();
- long end = start + batchTimeout;
- for (int i = 0; !isBatchCompleted(i); i++) {
- long timeout = end - System.currentTimeMillis();
- if (timeout < 0L) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("batch timeout expired at batch index: " + i);
- }
- break;
- }
- Exchange exchange = consumer.receive(timeout);
- if (exchange == null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("receive with timeout: " + timeout + " expired
at batch index: " + i);
- }
- break;
- }
- collection.add(exchange);
- }
-
- // we should NOT log the collection directly as it will invoke a
toString() on collection
- // and it will call collection.iterator() where end-users might do
stuff that would break
- // calling the iterator a 2nd time as below
-
- // lets send the batch
- Iterator<Exchange> iter = collection.iterator();
- while (iter.hasNext()) {
- Exchange exchange = iter.next();
- iter.remove();
- processExchange(exchange);
- }
- }
-
- /**
* A strategy method to decide if the batch is completed the resulting
exchanges should be sent
*/
- protected boolean isBatchCompleted(int index) {
+ protected boolean isBatchCompleted(int num) {
// out batch size is optional and we should only check it if its
enabled (= >0)
if (outBatchSize > 0 && collection.size() >= outBatchSize) {
return true;
}
// fallback to regular batch size check
- return index >= batchSize;
+ return num >= batchSize;
}
/**
@@ -191,16 +135,13 @@
}
protected void doStart() throws Exception {
- consumer = endpoint.createPollingConsumer();
-
- ServiceHelper.startServices(processor, consumer);
-
- Thread thread = new Thread(this, this + " Polling Thread");
- thread.start();
+ ServiceHelper.startServices(processor);
+ sender.start();
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumer, processor);
+ sender.cancel();
+ ServiceHelper.stopServices(processor);
collection.clear();
}
@@ -208,7 +149,71 @@
return collection;
}
+ /**
+ * Enqueues an exchange for later batch processing.
+ */
public void process(Exchange exchange) throws Exception {
- // empty since exchanges come from endpoint's polling consumer
+ sender.enqueueExchange(exchange);
+ }
+
+ /**
+ * Sender thread for queued-up exchanges.
+ */
+ private class BatchSender extends Thread {
+
+ private volatile boolean cancelRequested;
+
+ private LinkedBlockingQueue<Exchange> queue;
+
+ public BatchSender() {
+ super("Batch Sender");
+ this.queue = new LinkedBlockingQueue<Exchange>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(batchTimeout);
+ } catch (InterruptedException e) {
+ if (cancelRequested) {
+ return;
+ }
+ }
+ try {
+ sendExchanges();
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ }
+ }
+ }
+
+ public void cancel() {
+ cancelRequested = true;
+ interrupt();
+ }
+
+ public void sendBatch() {
+ interrupt();
+ }
+
+ public void enqueueExchange(Exchange exchange) {
+ queue.add(exchange);
+ if (isBatchCompleted(queue.size())) {
+ sendBatch();
+ }
+ }
+
+ private void sendExchanges() throws Exception {
+ queue.drainTo(collection, batchSize);
+ Iterator<Exchange> iter = collection.iterator();
+ while (iter.hasNext()) {
+ Exchange exchange = iter.next();
+ iter.remove();
+ processExchange(exchange);
+ }
+ }
+
}
+
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
Mon Dec 8 23:18:06 2008
@@ -21,7 +21,6 @@
import java.util.Set;
import java.util.TreeSet;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -35,16 +34,16 @@
* @version $Revision$
*/
public class Resequencer extends BatchProcessor {
- public Resequencer(Endpoint endpoint, Processor processor,
Expression<Exchange> expression) {
- this(endpoint, processor, createSet(expression));
+ public Resequencer(Processor processor, Expression expression) {
+ this(processor, createSet(expression));
}
- public Resequencer(Endpoint endpoint, Processor processor,
List<Expression> expressions) {
- this(endpoint, processor, createSet(expressions));
+ public Resequencer(Processor processor, List<Expression> expressions) {
+ this(processor, createSet(expressions));
}
- public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange>
collection) {
- super(endpoint, processor, collection);
+ public Resequencer(Processor processor, Set<Exchange> collection) {
+ super(processor, collection);
}
@Override
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Mon Dec 8 23:18:06 2008
@@ -16,9 +16,7 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
@@ -52,14 +50,14 @@
*
* @see ResequencerEngine
*/
-public class StreamResequencer extends ServiceSupport implements
SequenceSender<Exchange>, Runnable, Processor {
+public class StreamResequencer extends ServiceSupport implements
SequenceSender<Exchange>, Processor {
+ private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L;
+
private ExceptionHandler exceptionHandler;
private ResequencerEngine<Exchange> engine;
- private PollingConsumer<? extends Exchange> consumer;
- private Endpoint<? extends Exchange> endpoint;
private Processor processor;
- private Thread worker;
+ private Delivery delivery;
private int capacity;
/**
@@ -72,11 +70,10 @@
* @param comparator
* a sequence element comparator for exchanges.
*/
- public StreamResequencer(Endpoint<? extends Exchange> endpoint, Processor
processor, SequenceElementComparator<Exchange> comparator) {
+ public StreamResequencer(Processor processor,
SequenceElementComparator<Exchange> comparator) {
this.exceptionHandler = new LoggingExceptionHandler(getClass());
this.engine = new ResequencerEngine<Exchange>(comparator);
this.engine.setSequenceSender(this);
- this.endpoint = endpoint;
this.processor = processor;
}
@@ -139,11 +136,10 @@
@Override
protected void doStart() throws Exception {
- consumer = endpoint.createPollingConsumer();
- ServiceHelper.startServices(processor, consumer);
- worker = new Thread(this, this + " Polling Thread");
+ ServiceHelper.startServices(processor);
+ delivery = new Delivery();
engine.start();
- worker.start();
+ delivery.start();
}
@Override
@@ -151,7 +147,7 @@
// let's stop everything in the reverse order
// no need to stop the worker thread -- it will stop automatically
when this service is stopped
engine.stop();
- ServiceHelper.stopServices(consumer, processor);
+ ServiceHelper.stopServices(processor);
}
/**
@@ -164,43 +160,49 @@
processor.process(o);
}
- /**
- * Loops over [EMAIL PROTECTED] #processExchange()}.
- */
- public void run() {
- while (!isStopped() && !isStopping()) {
- try {
- processExchange();
- } catch (Exception e) {
- exceptionHandler.handleException(e);
- }
+ public void process(Exchange exchange) throws Exception {
+ while (engine.size() >= capacity) {
+ Thread.sleep(getTimeout());
}
+ engine.insert(exchange);
+ delivery.request();
}
- /**
- * Processes an exchange received from the this resequencer's
- * <code>endpoint</code>. Received exchanges are processed via
- * [EMAIL PROTECTED] ResequencerEngine#insert(Object)}.
- * [EMAIL PROTECTED] ResequencerEngine#deliver()} is then called in any
case regardless
- * whether a message was received or receiving timed out.
- *
- * @throws Exception
- * if exchange delivery fails.
- */
- protected void processExchange() throws Exception {
- if (engine.size() >= capacity) {
- Thread.sleep(getTimeout());
- } else {
- Exchange exchange = consumer.receive(getTimeout());
- if (exchange != null) {
- engine.insert(exchange);
+ private class Delivery extends Thread {
+
+ private volatile boolean cancelRequested;
+
+ public Delivery() {
+ super("Delivery Thread");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
+ } catch (InterruptedException e) {
+ if (cancelRequested) {
+ return;
+ }
+ }
+ try {
+ engine.deliver();
+ } catch (Exception e) {
+ exceptionHandler.handleException(e);
+ }
}
}
- engine.deliver();
- }
- public void process(Exchange exchange) throws Exception {
- engine.insert(exchange);
+ public void cancel() {
+ cancelRequested = true;
+ interrupt();
+ }
+
+ public void request() {
+ interrupt();
+ }
+
}
-
+
}
\ No newline at end of file
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Dec 8 23:18:06 2008
@@ -17,8 +17,6 @@
package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -84,19 +82,13 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- // disabled due CAMEL-393
- //from("seda:header").setHeader("visited",
constant(true)).aggregator(header("cheese")).to("mock:result");
// START SNIPPET: ex
// in this route we aggregate all from direct:state based on
the header id cheese
from("direct:start").aggregator(header("cheese")).to("mock:result");
- // because of a bug in Camel (CAMEL-393) we can not have other
types between from and aggregator
- // so we must do it as here with two routes. In the fist line
we set the header visited to true
- // and link it to the 2nd route by sending it to direct:temp...
- from("seda:header").setHeader("visited",
constant(true)).to("direct:temp");
- // and here we consume from direct:temp to continue from above
and aggregate
-
from("direct:temp").aggregator(header("cheese")).to("mock:result");
+ // CAMEL-393 now fixed
+ from("seda:header").setHeader("visited",
constant(true)).aggregator(header("cheese")).to("mock:result");
// in this sample we aggreagte using our own startegy with a
completion predicate
// stating that the aggregated header is equal to 5.
Copied:
activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(from r724619,
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java?p2=activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java&p1=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java&r1=724619&r2=724629&rev=724629&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(original)
+++
activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
Mon Dec 8 23:18:06 2008
@@ -25,8 +25,8 @@
@Override
public void configure() throws Exception {
-
from("activemq:queue:in1").resequence().body().batch().size(100).timeout(10000L).to("mock:result");
-
from("activemq:queue:in2").resequence().header("num").stream().timeout(2000L).to("mock:result");
+
from("activemq:queue:in1").resequencer().body().batch().size(100).timeout(10000L).to("mock:result");
+
from("activemq:queue:in2").resequencer().header("num").stream().timeout(2000L).to("mock:result");
}
}