Revert "Fixed the producer which now should be handling the async and sync flows in an improved way."
This reverts commit d0bd0f7449f63893b0f33a3444d91a4a8e0f1acb. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/13e3e12a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/13e3e12a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/13e3e12a Branch: refs/heads/master Commit: 13e3e12acb1d4a0302620c0517d2f55a79cf60e0 Parents: 7530f41 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Nov 25 12:50:02 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Nov 25 12:50:02 2016 +0100 ---------------------------------------------------------------------- .../component/firebase/FirebaseEndpoint.java | 14 ++++----- .../component/firebase/FirebaseProducer.java | 31 ++++++++++--------- .../firebase/FirebaseProducerTest.java | 32 +++++--------------- 3 files changed, 30 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java index 41d7a61..709785e 100644 --- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java +++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java @@ -52,10 +52,10 @@ public class FirebaseEndpoint extends DefaultEndpoint { @Metadata(required = "false") private String keyName = "firebaseKey"; - @UriParam(defaultValue = "reply", description = "If true, the save or update request (set value in Firebase terms) " + @UriParam(defaultValue = "async", description = "If true, the save or update request (set value in Firebase terms) " + "is fired and the reply will be ignored, else the routing thread will wait and the reply will be saved in the exchange message") @Metadata(required = "false") - private boolean reply; + private boolean async; public FirebaseEndpoint(String uri, FirebaseComponent firebaseComponent, FirebaseConfig firebaseConfig) { super(uri, firebaseComponent); @@ -64,7 +64,7 @@ public class FirebaseEndpoint extends DefaultEndpoint { this.setServiceAccountFile(firebaseConfig.getServiceAccountFile()); this.databaseUrl = firebaseConfig.getDatabaseUrl(); final String keyName = firebaseConfig.getKeyName(); - this.setReply(firebaseConfig.isAsync()); + this.setAsync(firebaseConfig.isAsync()); if (keyName != null) { this.setKeyName(keyName); } @@ -110,12 +110,12 @@ public class FirebaseEndpoint extends DefaultEndpoint { this.keyName = keyName; } - public boolean isReply() { - return reply; + public boolean isAsync() { + return async; } - public void setReply(boolean reply) { - this.reply = reply; + public void setAsync(boolean async) { + this.async = async; } public FirebaseApp getFirebaseApp() { http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java index 33ba39b..ef8f8a6 100644 --- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java +++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java @@ -22,6 +22,7 @@ import com.google.firebase.database.FirebaseDatabase; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.Processor; import org.apache.camel.component.firebase.exception.DatabaseErrorException; import org.apache.camel.impl.DefaultAsyncProducer; import org.slf4j.Logger; @@ -42,35 +43,35 @@ public class FirebaseProducer extends DefaultAsyncProducer { rootReference = endpoint.getRootReference(); } + /** + * Processes the message exchange. + * Similar to {@link Processor#process}, but the caller supports having the exchange asynchronously processed. + * <p/> + * If there was a failure processing then the caused {@link Exception} would be set on the {@link Exchange}. + * + * @param exchange the message exchange + * @param callback the {@link AsyncCallback} will be invoked when the processing of the exchange is completed. + * If the exchange is completed synchronously, then the callback is also invoked synchronously. + * The callback should therefore be careful of starting recursive loop. + * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously + */ @Override public boolean process(Exchange exchange, AsyncCallback callback) { final Message in = exchange.getIn(); - final Message out = exchange.getOut(); String firebaseKey = (String) in.getHeader(endpoint.getKeyName()); Object value = in.getBody(); DatabaseReference ref = FirebaseDatabase .getInstance(endpoint.getFirebaseApp()) .getReference(rootReference).child(firebaseKey); - final boolean reply = endpoint.isReply(); - out.setHeaders(in.getHeaders()); - if (reply) { // Wait for reply - processReply(exchange, callback, value, ref); - } else { // Fire and forget - ref.setValue(value); - out.setBody(in.getBody()); - callback.done(true); - } - return !reply; - } - - private void processReply(Exchange exchange, AsyncCallback callback, Object value, DatabaseReference ref) { ref.setValue(value, (DatabaseError databaseError, DatabaseReference databaseReference) -> { if (databaseError != null) { exchange.setException(new DatabaseErrorException(databaseError)); + exchange.getOut().setFault(true); } else { exchange.getOut().setBody(databaseReference); } - callback.done(false); + callback.done(endpoint.isAsync()); }); + return endpoint.isAsync(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java index 973b2ee..f0f2cc3 100644 --- a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java +++ b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java @@ -22,9 +22,6 @@ import java.nio.file.Paths; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import static java.util.stream.IntStream.range; -import static junit.framework.TestCase.fail; - import com.google.firebase.database.DatabaseReference; import org.apache.camel.CamelContext; import org.apache.camel.Message; @@ -55,28 +52,16 @@ public class FirebaseProducerTest { } @Test - public void whenFirebaseSetShouldReceiveMessageAsDBReference() throws Exception { - startRoute(true, DatabaseReference.class); + public void whenFirebaseSetShouldReceiveMessagesSync() throws Exception { + startRoute(false, DatabaseReference.class); } @Test - public void whenFirebaseSetShouldReceiveMessageAsDbString() throws Exception { - startRoute(false, String.class); + public void whenFirebaseSetShouldReceiveMessagesAsync() throws Exception { + startRoute(true, String.class); } - @Test - public void whenMultipleFirebaseSetShouldReceiveExpectedMessages() { - range(0, 10).forEach(i -> { - try { - startRoute(true, DatabaseReference.class); - startRoute(false, String.class); - } catch (Exception e) { - fail("Multiple test fails: " + e); - } - }); - } - - private void startRoute(final boolean reply, final Class<?> expectedBodyClass) throws Exception { + private void startRoute(final boolean async, final Class<?> expectedBodyClass) throws Exception { sampleInputProvider.copySampleFile(); CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { @@ -93,14 +78,11 @@ public class FirebaseProducerTest { out.setHeader("firebaseKey", keyValue[0]); out.setBody(keyValue[1].trim()); }) - .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&reply=%b", - ConfigurationProvider.createDatabaseUrl(), rootReference, serviceAccountFile, reply)) + .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&async=%b", + ConfigurationProvider.createDatabaseUrl(), rootReference, serviceAccountFile, async)) .to("log:whenFirebaseSet?level=WARN") .process(exchange1 -> { assertThat(exchange1.getIn().getBody().getClass()).isEqualTo(expectedBodyClass); - if (reply) { - assertThat(exchange1.getIn().getHeader("firebaseKey")).isNotNull(); - } try { reentrantLock.lock(); wake.signal();