Author: davsclaus
Date: Fri Feb 15 16:48:27 2013
New Revision: 1446682
URL: http://svn.apache.org/r1446682
Log:
CAMEL-6042: Added OptimisticLockingAggregationRepository. Thanks to Aaron
Whiteside for the patch.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
(with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
Fri Feb 15 16:48:27 2013
@@ -73,6 +73,8 @@ public class AggregateDefinition extends
@XmlAttribute
private Boolean parallelProcessing;
@XmlAttribute
+ private Boolean optimisticLocking;
+ @XmlAttribute
private String executorServiceRef;
@XmlAttribute
private String timeoutCheckerExecutorServiceRef;
@@ -195,6 +197,7 @@ public class AggregateDefinition extends
// set other options
answer.setParallelProcessing(isParallelProcessing());
+ answer.setOptimisticLocking(isOptimisticLocking());
if (getCompletionPredicate() != null) {
Predicate predicate =
getCompletionPredicate().createPredicate(routeContext);
answer.setCompletionPredicate(predicate);
@@ -385,6 +388,18 @@ public class AggregateDefinition extends
this.executorService = executorService;
}
+ public Boolean getOptimisticLocking() {
+ return optimisticLocking;
+ }
+
+ public void setOptimisticLocking(boolean optimisticLocking) {
+ this.optimisticLocking = optimisticLocking;
+ }
+
+ public boolean isOptimisticLocking() {
+ return optimisticLocking != null && optimisticLocking;
+ }
+
public Boolean getParallelProcessing() {
return parallelProcessing;
}
@@ -718,6 +733,11 @@ public class AggregateDefinition extends
setParallelProcessing(true);
return this;
}
+
+ public AggregateDefinition optimisticLocking() {
+ setOptimisticLocking(true);
+ return this;
+ }
public AggregateDefinition executorService(ExecutorService
executorService) {
setExecutorService(executorService);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Fri Feb 15 16:48:27 2013
@@ -17,13 +17,12 @@
package org.apache.camel.processor.aggregate;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -46,6 +45,7 @@ import org.apache.camel.Traceable;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.Synchronization;
@@ -94,10 +94,10 @@ public class AggregateProcessor extends
// store correlation key -> exchange id in timeout map
private TimeoutMap<String, String> timeoutMap;
private ExceptionHandler exceptionHandler = new
LoggingExceptionHandler(getClass());
- private AggregationRepository aggregationRepository = new
MemoryAggregationRepository();
- private Map<Object, Object> closedCorrelationKeys;
- private Set<String> batchConsumerCorrelationKeys = new
LinkedHashSet<String>();
- private final Set<String> inProgressCompleteExchanges = new
HashSet<String>();
+ private AggregationRepository aggregationRepository;
+ private Map<String, String> closedCorrelationKeys;
+ private final Set<String> batchConsumerCorrelationKeys = new
ConcurrentSkipListSet<String>();
+ private final Set<String> inProgressCompleteExchanges =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Map<String, RedeliveryData> redeliveryState = new
ConcurrentHashMap<String, RedeliveryData>();
// keep booking about redelivery
@@ -109,6 +109,7 @@ public class AggregateProcessor extends
private boolean ignoreInvalidCorrelationKeys;
private Integer closeCorrelationKeyOnCompletion;
private boolean parallelProcessing;
+ private boolean optimisticLocking;
// different ways to have completion triggered
private boolean eagerCheckCompletion;
@@ -189,18 +190,38 @@ public class AggregateProcessor extends
throw new ClosedCorrelationKeyException(key, exchange);
}
- // copy exchange, and do not share the unit of work
- // the aggregated output runs in another unit of work
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-
- // when memory based then its fast using synchronized, but if the
aggregation repository is IO
- // bound such as JPA etc then concurrent aggregation per correlation
key could
- // improve performance as we can run aggregation repository get/add in
parallel
- lock.lock();
- try {
- doAggregation(key, copy);
- } finally {
- lock.unlock();
+ //
+ // todo: explain optimistic lock handling
+ if (optimisticLocking) {
+ boolean done = false;
+ int attempt = 0;
+ while (!done) {
+ attempt++;
+ // copy exchange, and do not share the unit of work
+ // the aggregated output runs in another unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
false);
+ try {
+ doAggregation(key, copy);
+ done = true;
+ } catch
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+ LOG.trace("On attempt {}
OptimisticLockingAggregationRepository: {} threw OptimisticLockingException
while trying to add() key: {} and exchange: {}",
+ new Object[]{attempt, aggregationRepository,
key, copy, e});
+ }
+ }
+ } else {
+ // copy exchange, and do not share the unit of work
+ // the aggregated output runs in another unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
false);
+
+ // when memory based then its fast using synchronized, but if the
aggregation repository is IO
+ // bound such as JPA etc then concurrent aggregation per
correlation key could
+ // improve performance as we can run aggregation repository
get/add in parallel
+ lock.lock();
+ try {
+ doAggregation(key, copy);
+ } finally {
+ lock.unlock();
+ }
}
}
@@ -211,19 +232,24 @@ public class AggregateProcessor extends
* in parallel.
*
* @param key the correlation key
- * @param exchange the exchange
+ * @param newExchange the exchange
* @return the aggregated exchange
* @throws org.apache.camel.CamelExchangeException is thrown if error
aggregating
*/
- private Exchange doAggregation(String key, Exchange exchange) throws
CamelExchangeException {
+ private Exchange doAggregation(String key, Exchange newExchange) throws
CamelExchangeException {
LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
Exchange answer;
- Exchange oldExchange =
aggregationRepository.get(exchange.getContext(), key);
- Exchange newExchange = exchange;
+ Exchange originalExchange =
aggregationRepository.get(newExchange.getContext(), key);
+ Exchange oldExchange = originalExchange;
Integer size = 1;
if (oldExchange != null) {
+ // hack to support legacy AggregationStrategy's that modify and
return the oldExchange, these will not
+ // working when using an identify based approach for optimistic
locking like the MemoryAggregationRepository.
+ if (optimisticLocking && aggregationRepository instanceof
MemoryAggregationRepository) {
+ oldExchange = originalExchange.copy();
+ }
size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0,
Integer.class);
size++;
}
@@ -238,16 +264,16 @@ public class AggregateProcessor extends
newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
}
- // prepare the exchanges for aggregation and aggregate it
+ // prepare the exchanges for aggregation and then aggregate them
ExchangeHelper.prepareAggregation(oldExchange, newExchange);
// must catch any exception from aggregation
try {
- answer = onAggregation(oldExchange, exchange);
+ answer = onAggregation(oldExchange, newExchange);
} catch (Throwable e) {
- throw new CamelExchangeException("Error occurred during
aggregation", exchange, e);
+ throw new CamelExchangeException("Error occurred during
aggregation", newExchange, e);
}
if (answer == null) {
- throw new CamelExchangeException("AggregationStrategy " +
aggregationStrategy + " returned null which is not allowed", exchange);
+ throw new CamelExchangeException("AggregationStrategy " +
aggregationStrategy + " returned null which is not allowed", newExchange);
}
// update the aggregated size
@@ -260,8 +286,7 @@ public class AggregateProcessor extends
// only need to update aggregation repository if we are not complete
if (complete == null) {
- LOG.trace("In progress aggregated exchange: {} with correlation
key: {}", answer, key);
- aggregationRepository.add(exchange.getContext(), key, answer);
+ doAggregationRepositoryAdd(newExchange.getContext(), key,
originalExchange, answer);
} else {
// if batch consumer completion is enabled then we need to
complete the group
if ("consumer".equals(complete)) {
@@ -276,14 +301,14 @@ public class AggregateProcessor extends
if (batchAnswer != null) {
batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- onCompletion(batchKey, batchAnswer, false);
+ onCompletion(batchKey, originalExchange, batchAnswer,
false);
}
}
batchConsumerCorrelationKeys.clear();
} else {
// we are complete for this exchange
answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
- onCompletion(key, answer, false);
+ onCompletion(key, originalExchange, answer, false);
}
}
@@ -292,6 +317,28 @@ public class AggregateProcessor extends
return answer;
}
+ protected void doAggregationRepositoryAdd(CamelContext camelContext,
String key, Exchange oldExchange, Exchange newExchange) {
+ LOG.trace("In progress aggregated oldExchange: {}, newExchange: {}
with correlation key: {}", new Object[]{oldExchange, newExchange, key});
+ if (optimisticLocking) {
+ try {
+
((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext,
key, oldExchange, newExchange);
+ } catch
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+ onOptimisticLockingFailure(oldExchange, newExchange);
+ throw e;
+ }
+ } else {
+ aggregationRepository.add(camelContext, key, newExchange);
+ }
+ }
+
+ protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange
newExchange) {
+ if (aggregationStrategy instanceof
OptimisticLockingAwareAggregationStrategy) {
+ LOG.trace("onOptimisticLockFailure with AggregationStrategy: {},
oldExchange: {}, newExchange: {}",
+ new Object[]{aggregationStrategy, oldExchange,
newExchange});
+
((OptimisticLockingAwareAggregationStrategy)aggregationStrategy).onOptimisticLockFailure(oldExchange,
newExchange);
+ }
+ }
+
/**
* Tests whether the given exchange is complete or not
*
@@ -368,11 +415,13 @@ public class AggregateProcessor extends
return aggregationStrategy.aggregate(oldExchange, newExchange);
}
- protected void onCompletion(final String key, final Exchange exchange,
boolean fromTimeout) {
+ protected void onCompletion(final String key, final Exchange original,
final Exchange aggregated, boolean fromTimeout) {
+ // remove from repository as its completed, we do this first as to
trigger any OptimisticLockingException's
+ aggregationRepository.remove(aggregated.getContext(), key, original);
+
// store the correlation key as property
- exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
- // remove from repository as its completed
- aggregationRepository.remove(exchange.getContext(), key, exchange);
+ aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
+
if (!fromTimeout && timeoutMap != null) {
// cleanup timeout map if it was a incoming exchange which
triggered the timeout (and not the timeout checker)
timeoutMap.remove(key);
@@ -388,24 +437,24 @@ public class AggregateProcessor extends
// to allow any custom processing before discarding the exchange
if (aggregationStrategy instanceof
TimeoutAwareAggregationStrategy) {
long timeout = getCompletionTimeout() > 0 ?
getCompletionTimeout() : -1;
- ((TimeoutAwareAggregationStrategy)
aggregationStrategy).timeout(exchange, -1, -1, timeout);
+ ((TimeoutAwareAggregationStrategy)
aggregationStrategy).timeout(aggregated, -1, -1, timeout);
}
}
if (fromTimeout && isDiscardOnCompletionTimeout()) {
// discard due timeout
- LOG.debug("Aggregation for correlation key {} discarding
aggregated exchange: ()", key, exchange);
+ LOG.debug("Aggregation for correlation key {} discarding
aggregated exchange: {}", key, aggregated);
// must confirm the discarded exchange
- aggregationRepository.confirm(exchange.getContext(),
exchange.getExchangeId());
+ aggregationRepository.confirm(aggregated.getContext(),
aggregated.getExchangeId());
// and remove redelivery state as well
- redeliveryState.remove(exchange.getExchangeId());
+ redeliveryState.remove(aggregated.getExchangeId());
} else {
// the aggregated exchange should be published (sent out)
- onSubmitCompletion(key, exchange);
+ onSubmitCompletion(key, aggregated);
}
}
- private void onSubmitCompletion(final Object key, final Exchange exchange)
{
+ private void onSubmitCompletion(final String key, final Exchange exchange)
{
LOG.debug("Aggregation complete for correlation key {} sending
aggregated exchange: {}", key, exchange);
// add this as in progress before we submit the task
@@ -445,7 +494,7 @@ public class AggregateProcessor extends
* Restores the timeout map with timeout values from the aggregation
repository.
* <p/>
* This is needed in case the aggregator has been stopped and started
again (for example a server restart).
- * Then the existing exchanges from the {@link AggregationRepository} must
have its timeout conditions restored.
+ * Then the existing exchanges from the {@link AggregationRepository} must
have their timeout conditions restored.
*/
protected void restoreTimeoutMapFromAggregationRepository() throws
Exception {
// grab the timeout value for each partly aggregated exchange
@@ -581,6 +630,14 @@ public class AggregateProcessor extends
this.parallelProcessing = parallelProcessing;
}
+ public boolean isOptimisticLocking() {
+ return optimisticLocking;
+ }
+
+ public void setOptimisticLocking(boolean optimisticLocking) {
+ this.optimisticLocking = optimisticLocking;
+ }
+
public AggregationRepository getAggregationRepository() {
return aggregationRepository;
}
@@ -663,17 +720,17 @@ public class AggregateProcessor extends
private AggregationTimeoutMap(ScheduledExecutorService executor, long
requestMapPollTimeMillis) {
// do NOT use locking on the timeout map as this aggregator has
its own shared lock we will use instead
- super(executor, requestMapPollTimeMillis, false);
+ super(executor, requestMapPollTimeMillis, optimisticLocking);
}
@Override
public void purge() {
// must acquire the shared aggregation lock to be able to purge
- lock.lock();
+ if (!optimisticLocking) { lock.lock(); }
try {
super.purge();
} finally {
- lock.unlock();
+ if (!optimisticLocking) { lock.unlock(); }
}
}
@@ -688,11 +745,23 @@ public class AggregateProcessor extends
}
// get the aggregated exchange
+ boolean evictionStolen = false;
Exchange answer = aggregationRepository.get(camelContext, key);
- if (answer != null) {
+ if (answer == null) {
+ evictionStolen = true;
+ } else {
// indicate it was completed by timeout
answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
"timeout");
- onCompletion(key, answer, true);
+ try {
+ onCompletion(key, answer, answer, true);
+ } catch
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+ evictionStolen = true;
+ }
+ }
+
+ if (optimisticLocking && evictionStolen) {
+ LOG.debug("Another Camel instance has already successfully
correlated or processed this timeout eviction "
+ + "for exchange with id: {} and correlation id: {}",
exchangeId, key);
}
return true;
}
@@ -717,19 +786,29 @@ public class AggregateProcessor extends
if (keys != null && !keys.isEmpty()) {
// must acquire the shared aggregation lock to be able to
trigger interval completion
- lock.lock();
+ if (!optimisticLocking) { lock.lock(); }
try {
for (String key : keys) {
+ boolean stolenInterval = false;
Exchange exchange =
aggregationRepository.get(camelContext, key);
- if (exchange != null) {
+ if (exchange == null) {
+ stolenInterval = true;
+ } else {
LOG.trace("Completion interval triggered for
correlation key: {}", key);
// indicate it was completed by interval
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
- onCompletion(key, exchange, false);
+ try {
+ onCompletion(key, exchange, exchange, false);
+ } catch
(OptimisticLockingAggregationRepository.OptimisticLockingException e) {
+ stolenInterval = true;
+ }
+ }
+ if (optimisticLocking && stolenInterval) {
+ LOG.debug("Another Camel instance has already
processed this interval aggregation for exchange with correlation id: {}", key);
}
}
} finally {
- lock.unlock();
+ if (!optimisticLocking) { lock.unlock(); }
}
}
@@ -842,13 +921,25 @@ public class AggregateProcessor extends
if (getCloseCorrelationKeyOnCompletion() != null) {
if (getCloseCorrelationKeyOnCompletion() > 0) {
LOG.info("Using ClosedCorrelationKeys with a LRUCache with a
capacity of " + getCloseCorrelationKeyOnCompletion());
- closedCorrelationKeys = new LRUCache<Object,
Object>(getCloseCorrelationKeyOnCompletion());
+ closedCorrelationKeys = new LRUCache<String,
String>(getCloseCorrelationKeyOnCompletion());
} else {
LOG.info("Using ClosedCorrelationKeys with unbounded
capacity");
- closedCorrelationKeys = new HashMap<Object, Object>();
+ closedCorrelationKeys = new ConcurrentHashMap<String,
String>();
}
}
+ if (aggregationRepository == null) {
+ aggregationRepository = new
MemoryAggregationRepository(optimisticLocking);
+ LOG.info("Defaulting to MemoryAggregationRepository");
+ }
+
+ if (optimisticLocking) {
+ if (!(aggregationRepository instanceof
OptimisticLockingAggregationRepository)) {
+ throw new IllegalArgumentException("Optimistic locking cannot
be enabled without using an AggregationRepository that implements
OptimisticLockingAggregationRepository");
+ }
+ LOG.info("Optimistic locking is enabled");
+ }
+
ServiceHelper.startServices(aggregationStrategy, processor,
aggregationRepository);
// should we use recover checker
@@ -1000,7 +1091,7 @@ public class AggregateProcessor extends
int total = 0;
if (keys != null && !keys.isEmpty()) {
// must acquire the shared aggregation lock to be able to trigger
force completion
- lock.lock();
+ if (!optimisticLocking) { lock.lock(); }
total = keys.size();
try {
for (String key : keys) {
@@ -1009,11 +1100,11 @@ public class AggregateProcessor extends
LOG.trace("Force completion triggered for correlation
key: {}", key);
// indicate it was completed by a force completion
request
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY,
"forceCompletion");
- onCompletion(key, exchange, false);
+ onCompletion(key, exchange, exchange, false);
}
}
} finally {
- lock.unlock();
+ if (!optimisticLocking) { lock.unlock(); }
}
}
LOG.trace("Completed force completion of all groups task");
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=1446682&r1=1446681&r2=1446682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
Fri Feb 15 16:48:27 2013
@@ -17,24 +17,50 @@
package org.apache.camel.processor.aggregate;
import java.util.Collections;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
-import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.support.ServiceSupport;
/**
- * A memory based {@link org.apache.camel.spi.AggregationRepository} which
stores in memory only.
+ * A memory based {@link org.apache.camel.spi.AggregationRepository} which
stores {@link Exchange}s in memory only.
+ *
+ * Supports both optimistic locking and non-optimistic locking modes. Defaults
to non-optimistic locking mode.
*
* @version
*/
-public class MemoryAggregationRepository extends ServiceSupport implements
AggregationRepository {
- private final Map<String, Exchange> cache = new ConcurrentHashMap<String,
Exchange>();
+public class MemoryAggregationRepository extends ServiceSupport implements
OptimisticLockingAggregationRepository {
+ private final ConcurrentMap<String, Exchange> cache = new
ConcurrentHashMap<String, Exchange>();
+ private final boolean optimisticLocking;
+
+ public MemoryAggregationRepository() {
+ this(false);
+ }
+
+ public MemoryAggregationRepository(boolean optimisticLocking) {
+ this.optimisticLocking = optimisticLocking;
+ }
+
+ public Exchange add(CamelContext camelContext, String key, Exchange
oldExchange, Exchange newExchange) {
+ if (!optimisticLocking) { throw new UnsupportedOperationException(); }
+ if (oldExchange == null) {
+ if (cache.putIfAbsent(key, newExchange) != null) {
+ throw new OptimisticLockingException();
+ }
+ } else {
+ if (!cache.replace(key, oldExchange, newExchange)) {
+ throw new OptimisticLockingException();
+ }
+ }
+ return oldExchange;
+ }
public Exchange add(CamelContext camelContext, String key, Exchange
exchange) {
+ if (optimisticLocking) { throw new UnsupportedOperationException(); }
return cache.put(key, exchange);
}
@@ -43,7 +69,13 @@ public class MemoryAggregationRepository
}
public void remove(CamelContext camelContext, String key, Exchange
exchange) {
- cache.remove(key);
+ if (optimisticLocking) {
+ if (!cache.remove(key, exchange)) {
+ throw new OptimisticLockingException();
+ }
+ } else {
+ cache.remove(key);
+ }
}
public void confirm(CamelContext camelContext, String exchangeId) {
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A specialized {@link AggregationStrategy} which gets a callback when the
aggregated {@link Exchange} fails to add
+ * in the {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}
because of
+ * an {@link
org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException}.
+ *
+ * Please note that when aggregating {@link Exchange}'s to be careful not to
modify and return the {@code oldExchange}
+ * from the {@code aggregate()} method. If you are using the default
MemoryAggregationRepository this will mean you have
+ * modified the value of an object already referenced/stored by the
MemoryAggregationRepository. This makes it impossible
+ * for optimistic locking to work correctly with the
MemoryAggregationRepository.
+ *
+ * You should instead return either the new {@code newExchange} or a
completely new instance of {@link Exchange}. This
+ * is due to the nature of how the underlying {@link
java.util.concurrent.ConcurrentHashMap} performs CAS operations on the value
identity.
+ *
+ * @version
+ */
+public interface OptimisticLockingAwareAggregationStrategy extends
AggregationStrategy {
+
+ // TODO: In Camel 3.0 we should move this to org.apache.camel package
+
+ void onOptimisticLockFailure(Exchange oldExchange, Exchange newExchange);
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/OptimisticLockingAwareAggregationStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+
+/**
+ * A specialized {@link org.apache.camel.spi.AggregationRepository} which also
supports
+ * optimistic locking.
+ *
+ * If the underlying implementation cannot perform optimistic locking, it
should
+ * not implement this interface.
+ *
+ * @version
+ */
+public interface OptimisticLockingAggregationRepository extends
AggregationRepository {
+
+ /**
+ * {@link Exception} used by an {@code AggregationRepository} to indicate
that an optimistic
+ * update error has occurred and that the operation should be retried by
the caller.
+ * <p/>
+ */
+ public static class OptimisticLockingException extends RuntimeException { }
+
+ /**
+ * Add the given {@link org.apache.camel.Exchange} under the correlation
key.
+ * <p/>
+ * Will perform optimistic locking to replace expected existing exchange
with
+ * the new supplied exchange.
+ *
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param oldExchange the old exchange that is expected to exist when
replacing with the new exchange
+ * @param newExchange the new aggregated exchange, to replace old
exchange
+ * @return the old exchange if any existed
+ * @throws OptimisticLockingException This should be thrown when the
currently stored exchange differs from the supplied oldExchange.
+ */
+ Exchange add(CamelContext camelContext, String key, Exchange oldExchange,
Exchange newExchange) throws OptimisticLockingException;
+
+ /**
+ *
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the exchange to remove
+ * @throws OptimisticLockingException This should be thrown when the
exchange has already been deleted, or otherwise modified.
+ */
+ void remove(CamelContext camelContext, String key, Exchange exchange)
throws OptimisticLockingException;
+}
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/OptimisticLockingAggregationRepository.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * @version
+ */
+public abstract class AbstractDistributedTest extends ContextTestSupport {
+
+ protected CamelContext context2;
+ protected ProducerTemplate template2;
+
+ public void setUp() throws Exception {
+ super.setUp();
+
+ context2 = new DefaultCamelContext();
+ template2 = context2.createProducerTemplate();
+ ServiceHelper.startServices(template2, context2);
+
+ // add routes after CamelContext has been started
+ context2.addRoutes(createRouteBuilder());
+ }
+
+ public void tearDown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(context2, template2);
+
+ super.tearDown();
+ }
+
+ protected MockEndpoint getMockEndpoint2(String uri) {
+ return context2.getEndpoint(uri, MockEndpoint.class);
+ }
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AbstractDistributedTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+/**
+ * Unit test to verify that aggregate by interval only also works.
+ *
+ * @version
+ */
+public class DistributedCompletionIntervalTest extends AbstractDistributedTest
{
+
+ public void testAggregateInterval() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ MockEndpoint mock2 = getMockEndpoint2("mock:result");
+ // by default the use latest aggregation strategy is used so we get
message 18 and message 19
+ mock.expectedBodiesReceived("Message 18");
+ mock2.expectedBodiesReceived("Message 19");
+
+ // ensure messages are send after the 1s
+ Thread.sleep(2000);
+
+ for (int i = 0; i < 20; i++) {
+ int choice = i % 2;
+ if (choice == 0) {
+ template.sendBodyAndHeader("direct:start", "Message " + i,
"id", "1");
+ } else {
+ template2.sendBodyAndHeader("direct:start", "Message " + i,
"id", "1");
+ }
+ }
+
+ mock.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ from("direct:start")
+ .aggregate(header("id"), new
UseLatestAggregationStrategy())
+ // trigger completion every 5th second
+ .completionInterval(5000)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedCompletionIntervalTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DistributedConcurrentPerCorrelationKeyTest extends
AbstractDistributedTest {
+
+ private MemoryAggregationRepository sharedAggregationRepository = new
MemoryAggregationRepository(true);
+
+ private int size = 200;
+ private final String uri = "direct:start";
+
+ @Test
+ public void testAggregateConcurrentPerCorrelationKey() throws Exception {
+ ExecutorService service = Executors.newFixedThreadPool(50);
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+ for (int i = 0; i < size; i++) {
+ final int id = i % 25;
+ final int choice = i % 2;
+ final int count = i;
+ tasks.add(new Callable<Object>() {
+ public Object call() throws Exception {
+ if (choice == 0) {
+ template.sendBodyAndHeader(uri, "" + count, "id", id);
+ } else {
+ template2.sendBodyAndHeader(uri, "" + count, "id", id);
+ }
+ return null;
+ }
+ });
+ }
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ MockEndpoint mock2 = getMockEndpoint2("mock:result");
+
+ // submit all tasks
+ service.invokeAll(tasks);
+ service.shutdown();
+ service.awaitTermination(10, TimeUnit.SECONDS);
+
+ int contextCount = mock.getReceivedCounter();
+ int context2Count = mock2.getReceivedCounter();
+
+ assertEquals(25, contextCount + context2Count);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new
BodyInAggregatingStrategy())
+ .aggregationRepository(sharedAggregationRepository)
+ .optimisticLocking()
+ .completionSize(8)
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedConcurrentPerCorrelationKeyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java?rev=1446682&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
Fri Feb 15 16:48:27 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
+
+/**
+ * @version
+ */
+public class DistributedTimeoutTest extends AbstractDistributedTest {
+
+ private MemoryAggregationRepository sharedAggregationRepository = new
MemoryAggregationRepository(true);
+
+ private final AtomicInteger invoked = new AtomicInteger();
+ private volatile Exchange receivedExchange;
+ private volatile int receivedIndex;
+ private volatile int receivedTotal;
+ private volatile long receivedTimeout;
+
+ public void testAggregateTimeout() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:aggregated");
+ MockEndpoint mock2 = getMockEndpoint2("mock:aggregated");
+ mock.expectedMessageCount(0);
+ mock2.expectedMessageCount(0);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template2.sendBodyAndHeader("direct:start", "B", "id", 123);
+
+ // wait 3 seconds so that the timeout kicks in
+ Thread.sleep(3000);
+
+ mock.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+
+ // should invoke the timeout method
+ assertEquals(1, invoked.get());
+
+ assertNotNull(receivedExchange);
+ assertEquals("AB", receivedExchange.getIn().getBody());
+ assertEquals(-1, receivedIndex);
+ assertEquals(-1, receivedTotal);
+ assertEquals(2000, receivedTimeout);
+
+ mock.reset();
+ mock.expectedMessageCount(0);
+ mock2.reset();
+ mock2.expectedBodiesReceived("ABC");
+
+ // now send 3 exchanges which shouldn't trigger the timeout anymore
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template2.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template2.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+ // should complete before timeout
+ mock2.assertIsSatisfied(2000);
+ mock.assertIsSatisfied(5000);
+
+ // should have not invoked the timeout method anymore
+ assertEquals(1, invoked.get());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .aggregationRepository(sharedAggregationRepository)
+ .optimisticLocking()
+ .discardOnCompletionTimeout()
+ .completionSize(3)
+ .completionTimeout(2000) // use a 2 second timeout
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private class MyAggregationStrategy implements
TimeoutAwareAggregationStrategy {
+
+ public void timeout(Exchange oldExchange, int index, int total, long
timeout) {
+ invoked.incrementAndGet();
+
+ // we can't assert on the expected values here as the contract of
this method doesn't
+ // allow to throw any Throwable (including AssertionFailedError)
so that we assert
+ // about the expected values directly inside the test method
itself. other than that
+ // asserting inside a thread other than the main thread dosen't
make much sense as
+ // junit would not realize the failed assertion!
+ receivedExchange = oldExchange;
+ receivedIndex = index;
+ receivedTotal = total;
+ receivedTimeout = timeout;
+ }
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(body +
newExchange.getIn().getBody(String.class));
+ return oldExchange;
+ }
+ }
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DistributedTimeoutTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date