Author: davsclaus
Date: Wed Jun 13 14:49:36 2012
New Revision: 1349891
URL: http://svn.apache.org/viewvc?rev=1349891&view=rev
Log:
CAMEL-5366: Add a way of checking if an Exchange has a given onCompletion
already
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1349891&r1=1349890&r2=1349891&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Jun
13 14:49:36 2012
@@ -486,6 +486,15 @@ public interface Exchange {
void addOnCompletion(Synchronization onCompletion);
/**
+ * Checks if the passed {@link org.apache.camel.spi.Synchronization}
instance is
+ * already contained on this exchange.
+ *
+ * @param onCompletion the callback instance that is being checked for
+ * @return <tt>true</tt>, if callback instance is already contained on
this exchange, else <tt>false</tt>
+ */
+ boolean containsOnCompletion(Synchronization onCompletion);
+
+ /**
* Handover all the on completions from this exchange to the target
exchange.
*
* @param target the target exchange
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=1349891&r1=1349890&r2=1349891&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
Wed Jun 13 14:49:36 2012
@@ -394,6 +394,16 @@ public final class DefaultExchange imple
}
}
+ public boolean containsOnCompletion(Synchronization onCompletion) {
+ if (unitOfWork != null) {
+ // if there is an unit of work then the completions is moved there
+ return unitOfWork.containsSynchronization(onCompletion);
+ } else {
+ // check temporary completions if no unit of work yet
+ return onCompletions != null &&
onCompletions.contains(onCompletion);
+ }
+ }
+
public void handoverCompletions(Exchange target) {
if (onCompletions != null) {
for (Synchronization onCompletion : onCompletions) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1349891&r1=1349890&r2=1349891&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
Wed Jun 13 14:49:36 2012
@@ -187,6 +187,10 @@ public class DefaultUnitOfWork implement
}
}
+ public synchronized boolean containsSynchronization(Synchronization
synchronization) {
+ return synchronizations != null &&
synchronizations.contains(synchronization);
+ }
+
public void handoverSynchronization(Exchange target) {
if (synchronizations == null || synchronizations.isEmpty()) {
return;
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=1349891&r1=1349890&r2=1349891&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java
Wed Jun 13 14:49:36 2012
@@ -44,6 +44,15 @@ public interface UnitOfWork extends Serv
void removeSynchronization(Synchronization synchronization);
/**
+ * Checks if the passed synchronization hook is already part of this unit
of work.
+ *
+ * @param synchronization the hook
+ * @return <tt>true</tt>, if the passed synchronization is part of this
unit of work, else <tt>false</tt>
+ */
+ boolean containsSynchronization(Synchronization synchronization);
+
+ /**
+ /**
* Handover all the registered synchronizations to the target {@link
org.apache.camel.Exchange}.
* <p/>
* This is used when a route turns into asynchronous and the {@link
org.apache.camel.Exchange} that
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java?rev=1349891&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java
Wed Jun 13 14:49:36 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * @version
+ */
+public class OnCompletionContainsTest extends ContextTestSupport {
+
+ class SimpleSynchronizationAdapter extends SynchronizationAdapter {
+ private String endPoint;
+ private String body;
+
+ SimpleSynchronizationAdapter(String endPoint, String body) {
+ super();
+ this.endPoint = endPoint;
+ this.body = body;
+ }
+
+ @Override
+ public void onDone(Exchange exchange) {
+ template.sendBody(endPoint, body);
+ }
+
+ @Override
+ public String toString() {
+ return body;
+ }
+ }
+
+ public void testOnCompletionContainsTest() throws Exception {
+ getMockEndpoint("mock:sync").expectedBodiesReceived("C", "B", "B",
"A", "Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onCompletion().to("mock:sync");
+
+ from("direct:start")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ SynchronizationAdapter
adapter = new SimpleSynchronizationAdapter("mock:sync", "A");
+ exchange.addOnCompletion(adapter);
+
+ // should not add the
adapter again as we already have it
+ if
(!exchange.containsOnCompletion(adapter)) {
+
exchange.addOnCompletion(adapter);
+ }
+
+ adapter = new
SimpleSynchronizationAdapter("mock:sync", "B");
+ exchange.addOnCompletion(adapter);
+
+ // now add the B again
as we want to test that this also work
+ if
(exchange.containsOnCompletion(adapter)) {
+
exchange.addOnCompletion(adapter);
+ }
+
+ // add a C that is no a
SimpleSynchronizationAdapter class
+ exchange.addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ template.sendBody("mock:sync", "C");
+ }
+
+ @Override
+ public String toString() {
+ return "C";
+ }
+ });
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+
+}
Modified:
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala?rev=1349891&r1=1349890&r2=1349891&view=diff
==============================================================================
---
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
(original)
+++
camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
Wed Jun 13 14:49:36 2012
@@ -114,6 +114,8 @@ class RichExchange(val exchange : Exchan
def addOnCompletion(onCompletion: Synchronization) {
exchange.addOnCompletion(onCompletion) }
+ def containsOnCompletion(onCompletion: Synchronization) =
exchange.containsOnCompletion(onCompletion)
+
def handoverCompletions(exchange : Exchange) {
exchange.handoverCompletions(exchange) }
def handoverCompletions = exchange.handoverCompletions