Revert "Fixed the producer which now should be handling the async and sync 
flows in an improved way."

This reverts commit d0bd0f7449f63893b0f33a3444d91a4a8e0f1acb.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/13e3e12a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/13e3e12a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/13e3e12a

Branch: refs/heads/master
Commit: 13e3e12acb1d4a0302620c0517d2f55a79cf60e0
Parents: 7530f41
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri Nov 25 12:50:02 2016 +0100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri Nov 25 12:50:02 2016 +0100

----------------------------------------------------------------------
 .../component/firebase/FirebaseEndpoint.java    | 14 ++++-----
 .../component/firebase/FirebaseProducer.java    | 31 ++++++++++---------
 .../firebase/FirebaseProducerTest.java          | 32 +++++---------------
 3 files changed, 30 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
 
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
index 41d7a61..709785e 100644
--- 
a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
+++ 
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java
@@ -52,10 +52,10 @@ public class FirebaseEndpoint extends DefaultEndpoint {
     @Metadata(required = "false")
     private String keyName = "firebaseKey";
 
-    @UriParam(defaultValue = "reply", description = "If true, the save or 
update request (set value in Firebase terms) "
+    @UriParam(defaultValue = "async", description = "If true, the save or 
update request (set value in Firebase terms) "
             + "is fired and the reply will be ignored, else the routing thread 
will wait and the reply will be saved in the exchange message")
     @Metadata(required = "false")
-    private boolean reply;
+    private boolean async;
 
     public FirebaseEndpoint(String uri, FirebaseComponent firebaseComponent, 
FirebaseConfig firebaseConfig) {
         super(uri, firebaseComponent);
@@ -64,7 +64,7 @@ public class FirebaseEndpoint extends DefaultEndpoint {
         this.setServiceAccountFile(firebaseConfig.getServiceAccountFile());
         this.databaseUrl = firebaseConfig.getDatabaseUrl();
         final String keyName = firebaseConfig.getKeyName();
-        this.setReply(firebaseConfig.isAsync());
+        this.setAsync(firebaseConfig.isAsync());
         if (keyName != null) {
             this.setKeyName(keyName);
         }
@@ -110,12 +110,12 @@ public class FirebaseEndpoint extends DefaultEndpoint {
         this.keyName = keyName;
     }
 
-    public boolean isReply() {
-        return reply;
+    public boolean isAsync() {
+        return async;
     }
 
-    public void setReply(boolean reply) {
-        this.reply = reply;
+    public void setAsync(boolean async) {
+        this.async = async;
     }
 
     public FirebaseApp getFirebaseApp() {

http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
 
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
index 33ba39b..ef8f8a6 100644
--- 
a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
+++ 
b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java
@@ -22,6 +22,7 @@ import com.google.firebase.database.FirebaseDatabase;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
 import org.apache.camel.component.firebase.exception.DatabaseErrorException;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.slf4j.Logger;
@@ -42,35 +43,35 @@ public class FirebaseProducer extends DefaultAsyncProducer {
         rootReference = endpoint.getRootReference();
     }
 
+    /**
+     * Processes the message exchange.
+     * Similar to {@link Processor#process}, but the caller supports having 
the exchange asynchronously processed.
+     * <p/>
+     * If there was a failure processing then the caused {@link Exception} 
would be set on the {@link Exchange}.
+     *
+     * @param exchange the message exchange
+     * @param callback the {@link AsyncCallback} will be invoked when the 
processing of the exchange is completed.
+     *                 If the exchange is completed synchronously, then the 
callback is also invoked synchronously.
+     *                 The callback should therefore be careful of starting 
recursive loop.
+     * @return (doneSync) <tt>true</tt> to continue execute synchronously, 
<tt>false</tt> to continue being executed asynchronously
+     */
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         final Message in = exchange.getIn();
-        final Message out = exchange.getOut();
         String firebaseKey = (String) in.getHeader(endpoint.getKeyName());
         Object value = in.getBody();
         DatabaseReference ref = FirebaseDatabase
                 .getInstance(endpoint.getFirebaseApp())
                 .getReference(rootReference).child(firebaseKey);
-        final boolean reply = endpoint.isReply();
-        out.setHeaders(in.getHeaders());
-        if (reply) { // Wait for reply
-            processReply(exchange, callback, value, ref);
-        } else { // Fire and forget
-            ref.setValue(value);
-            out.setBody(in.getBody());
-            callback.done(true);
-        }
-        return !reply;
-    }
-
-    private void processReply(Exchange exchange, AsyncCallback callback, 
Object value, DatabaseReference ref) {
         ref.setValue(value, (DatabaseError databaseError, DatabaseReference 
databaseReference) -> {
             if (databaseError != null) {
                 exchange.setException(new 
DatabaseErrorException(databaseError));
+                exchange.getOut().setFault(true);
             } else {
                 exchange.getOut().setBody(databaseReference);
             }
-            callback.done(false);
+            callback.done(endpoint.isAsync());
         });
+        return endpoint.isAsync();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/13e3e12a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
 
b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
index 973b2ee..f0f2cc3 100644
--- 
a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
+++ 
b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java
@@ -22,9 +22,6 @@ import java.nio.file.Paths;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static java.util.stream.IntStream.range;
-import static junit.framework.TestCase.fail;
-
 import com.google.firebase.database.DatabaseReference;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Message;
@@ -55,28 +52,16 @@ public class FirebaseProducerTest {
     }
 
     @Test
-    public void whenFirebaseSetShouldReceiveMessageAsDBReference() throws 
Exception {
-        startRoute(true, DatabaseReference.class);
+    public void whenFirebaseSetShouldReceiveMessagesSync() throws Exception {
+        startRoute(false, DatabaseReference.class);
     }
 
     @Test
-    public void whenFirebaseSetShouldReceiveMessageAsDbString() throws 
Exception {
-        startRoute(false, String.class);
+    public void whenFirebaseSetShouldReceiveMessagesAsync() throws Exception {
+        startRoute(true, String.class);
     }
 
-    @Test
-    public void whenMultipleFirebaseSetShouldReceiveExpectedMessages() {
-        range(0, 10).forEach(i -> {
-            try {
-                startRoute(true, DatabaseReference.class);
-                startRoute(false, String.class);
-            } catch (Exception e) {
-                fail("Multiple test fails: " + e);
-            }
-        });
-    }
-
-    private void startRoute(final boolean reply, final Class<?> 
expectedBodyClass) throws Exception {
+    private void startRoute(final boolean async, final Class<?> 
expectedBodyClass) throws Exception {
         sampleInputProvider.copySampleFile();
         CamelContext context = new DefaultCamelContext();
         context.addRoutes(new RouteBuilder() {
@@ -93,14 +78,11 @@ public class FirebaseProducerTest {
                             out.setHeader("firebaseKey", keyValue[0]);
                             out.setBody(keyValue[1].trim());
                         })
-                        
.to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&reply=%b",
-                                ConfigurationProvider.createDatabaseUrl(), 
rootReference, serviceAccountFile, reply))
+                        
.to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&async=%b",
+                                ConfigurationProvider.createDatabaseUrl(), 
rootReference, serviceAccountFile, async))
                         .to("log:whenFirebaseSet?level=WARN")
                         .process(exchange1 -> {
                             
assertThat(exchange1.getIn().getBody().getClass()).isEqualTo(expectedBodyClass);
-                            if (reply) {
-                                
assertThat(exchange1.getIn().getHeader("firebaseKey")).isNotNull();
-                            }
                             try {
                                 reentrantLock.lock();
                                 wake.signal();

Reply via email to