Author: ningjiang
Date: Mon Sep 22 03:05:21 2008
New Revision: 697770
URL: http://svn.apache.org/viewvc?rev=697770&view=rev
Log:
CAMEL-928 added property Exchange.AGGREGTED_COUNT
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
Mon Sep 22 03:05:21 2008
@@ -32,6 +32,8 @@
String CHARSET_NAME = "org.apache.camel.Exchange.CharsetName";
+ String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount";
+
/**
* Returns the [EMAIL PROTECTED] ExchangePattern} (MEP) of this exchange.
*
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
Mon Sep 22 03:05:21 2008
@@ -56,12 +56,21 @@
Exchange oldExchange = map.get(correlationKey);
Exchange newExchange = exchange;
if (oldExchange != null) {
+ Integer count = oldExchange.getProperty(Exchange.AGGREGATED_COUNT,
Integer.class);
+ if (count == null) {
+ count = 1;
+ }
+ count++;
newExchange = aggregationStrategy.aggregate(oldExchange,
newExchange);
+ newExchange.setProperty(Exchange.AGGREGATED_COUNT, count);
}
// the strategy may just update the old exchange and return it
if (newExchange != oldExchange) {
LOG.debug("put exchange:" + newExchange + " for key:" +
correlationKey);
+ if (oldExchange == null) {
+ newExchange.setProperty(Exchange.AGGREGATED_COUNT, new
Integer(1));
+ }
map.put(correlationKey, newExchange);
}
onAggregation(correlationKey, newExchange);
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
Mon Sep 22 03:05:21 2008
@@ -29,18 +29,13 @@
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newIn.getBody(String.class);
newIn.setBody(oldBody + "+" + newBody);
- Integer old = (Integer) oldExchange.getProperty("aggregated");
- if (old == null) {
- old = 1;
- }
- copy.setProperty("aggregated", old + 1);
return copy;
}
/**
* An expression used to determine if the aggregation is complete
*/
- public boolean isCompleted(@Header(name = "aggregated")
+ public boolean isCompleted(@Header(name = Exchange.AGGREGATED_COUNT)
Integer aggregated) {
if (aggregated == null) {
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
Mon Sep 22 03:05:21 2008
@@ -82,7 +82,7 @@
from("direct:z").process(new
AppendingProcessor("z")).to("direct:aggregator");
from("direct:aggregator").aggregator(header("cheese"), new
BodyInAggregatingStrategy()).
-
completedPredicate(header("aggregated").isEqualTo(3)).to("mock:result");
+
completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3)).to("mock:result");
// END SNIPPET: example
}
};
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
Mon Sep 22 03:05:21 2008
@@ -31,17 +31,14 @@
Double oldRate =
oldExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
Double newRate =
newExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
Exchange result = null;
- if (old == null) {
- old = 1;
- }
+
if (newRate >= oldRate) {
result = oldExchange;
} else {
result = newExchange;
}
LOG.debug("Get the lower rate exchange " + result);
- // Set the property for the completeness condition
- result.setProperty("aggregated", old + 1);
+
return result;
}
Modified:
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
---
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
(original)
+++
activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Mon Sep 22 03:05:21 2008
@@ -93,7 +93,7 @@
// The aggregation will completed when all the three bank responses
are received
from("jms:queue:bankReplyQueue")
.aggregator(header(Constants.PROPERTY_SSN), new
BankResponseAggregationStrategy())
- .completedPredicate(header("aggregated").isEqualTo(3))
+ .completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3))
// Here we do some translation and put the message back to
loanReplyQueue
.process(new Translator()).to("jms:queue:loanReplyQueue");