Repository: camel
Updated Branches:
  refs/heads/master 8e07c18c3 -> c08a60503


undertow async producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d03012bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d03012bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d03012bb

Branch: refs/heads/master
Commit: d03012bbbfc87bf84ceb71cacf2d53efdee208f0
Parents: 8e07c18
Author: Claus Ibsen <[email protected]>
Authored: Wed Jul 15 14:31:51 2015 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Wed Jul 15 14:31:51 2015 +0200

----------------------------------------------------------------------
 .../component/undertow/UndertowProducer.java    | 80 ++++++++++++--------
 .../undertow/UndertowComponentTest.java         | 15 ++--
 2 files changed, 54 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d03012bb/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index 23e21a3..e1330d7 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -26,10 +26,11 @@ import io.undertow.client.ClientRequest;
 import io.undertow.client.UndertowClient;
 import io.undertow.util.Headers;
 import io.undertow.util.Protocols;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.TypeConverter;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,12 +43,12 @@ import org.xnio.XnioWorker;
 
 /**
  * The Undertow producer.
- *
+ * <p/>
  * The implementation of Producer is considered as experimental. The Undertow 
client classes are not thread safe,
  * their purpose is for the reverse proxy usage inside Undertow itself. This 
may change in the future versions and
  * general purpose HTTP client wrapper will be added. Therefore this Producer 
may be changed too.
  */
-public class UndertowProducer extends DefaultProducer {
+public class UndertowProducer extends DefaultAsyncProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(UndertowProducer.class);
     private UndertowEndpoint endpoint;
 
@@ -61,31 +62,36 @@ public class UndertowProducer extends DefaultProducer {
         return endpoint;
     }
 
-    public void setEndpoint(UndertowEndpoint endpoint) {
-        this.endpoint = endpoint;
-    }
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            final UndertowClient client = UndertowClient.getInstance();
+            XnioWorker worker = 
Xnio.getInstance().createWorker(OptionMap.EMPTY);
 
-    // TODO: use async routing engine
+            IoFuture<ClientConnection> connect = 
client.connect(endpoint.getHttpURI(), worker, new 
ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 
8192), OptionMap.EMPTY);
 
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        final UndertowClient client = UndertowClient.getInstance();
-        XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);
-        IoFuture<ClientConnection> connect = 
client.connect(endpoint.getHttpURI(), worker, new 
ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 
8192), OptionMap.EMPTY);
+            ClientRequest request = new ClientRequest();
+            request.setProtocol(Protocols.HTTP_1_1);
 
-        ClientRequest request = new ClientRequest();
-        request.setProtocol(Protocols.HTTP_1_1);
+            Object body = getRequestBody(request, exchange);
 
-        Object body = getRequestBody(request, exchange);
+            TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
+            ByteBuffer bodyAsByte = tc.convertTo(ByteBuffer.class, body);
 
-        TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
-        ByteBuffer bodyAsByte = tc.convertTo(ByteBuffer.class, body);
+            if (body != null) {
+                request.getRequestHeaders().put(Headers.CONTENT_LENGTH, 
bodyAsByte.array().length);
+            }
 
-        if (body != null) {
-            request.getRequestHeaders().put(Headers.CONTENT_LENGTH, 
bodyAsByte.array().length);
+            connect.get().sendRequest(request, new 
UndertowProducerCallback(bodyAsByte, exchange, callback));
+
+        } catch (IOException e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
 
-        connect.get().sendRequest(request, new 
UndertowProducerCallback(bodyAsByte, exchange));
+        // use async routing engine
+        return false;
     }
 
     private Object getRequestBody(ClientRequest request, Exchange 
camelExchange) {
@@ -99,38 +105,46 @@ public class UndertowProducer extends DefaultProducer {
      */
     private class UndertowProducerCallback implements 
ClientCallback<ClientExchange> {
 
-        private ByteBuffer body;
-        private Exchange camelExchange;
+        private final ByteBuffer body;
+        private final Exchange camelExchange;
+        private final AsyncCallback callback;
 
-        public UndertowProducerCallback(ByteBuffer body, Exchange 
camelExchange) {
+        public UndertowProducerCallback(ByteBuffer body, Exchange 
camelExchange, AsyncCallback callback) {
             this.body = body;
             this.camelExchange = camelExchange;
+            this.callback = callback;
         }
 
+        // TODO: Add some logging of those events at trace or debug level
+
         @Override
         public void completed(ClientExchange clientExchange) {
             clientExchange.setResponseListener(new 
ClientCallback<ClientExchange>() {
                 @Override
                 public void completed(ClientExchange clientExchange) {
-                    Message message = null;
                     try {
-                        message = 
endpoint.getUndertowHttpBinding().toCamelMessage(clientExchange, camelExchange);
+                        Message message = 
endpoint.getUndertowHttpBinding().toCamelMessage(clientExchange, camelExchange);
+                        if (ExchangeHelper.isOutCapable(camelExchange)) {
+                            camelExchange.setOut(message);
+                        } else {
+                            camelExchange.setIn(message);
+                        }
                     } catch (Exception e) {
                         camelExchange.setException(e);
+                    } finally {
+                        // make sure to call callback
+                        callback.done(false);
                     }
-                    if (ExchangeHelper.isOutCapable(camelExchange)) {
-                        camelExchange.setOut(message);
-                    } else {
-                        camelExchange.setIn(message);
-                    }
-
                 }
 
                 @Override
                 public void failed(IOException e) {
                     camelExchange.setException(e);
+                    // make sure to call callback
+                    callback.done(false);
                 }
             });
+
             try {
                 //send body if exists
                 if (body != null) {
@@ -138,12 +152,16 @@ public class UndertowProducer extends DefaultProducer {
                 }
             } catch (IOException e) {
                 camelExchange.setException(e);
+                // make sure to call callback
+                callback.done(false);
             }
         }
 
         @Override
         public void failed(IOException e) {
             camelExchange.setException(e);
+            // make sure to call callback
+            callback.done(false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d03012bb/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java
 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java
index 7eac71e..91b526d 100644
--- 
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java
+++ 
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowComponentTest.java
@@ -28,24 +28,19 @@ public class UndertowComponentTest extends BaseUndertowTest 
{
 
     @Test
     public void testUndertow() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:myapp");
+        mockEndpoint.expectedHeaderReceived(Exchange.HTTP_METHOD, "GET");
+        LOG.debug("Number of exchanges in mock:myapp" + 
mockEndpoint.getExchanges().size());
 
-
-        String response = 
template.requestBody("undertow://http://localhost:{{port}}/myapp";, "Hello 
Camel!", String.class);
-
+        String response = 
template.requestBody("undertow:http://localhost:{{port}}/myapp";, "Hello 
Camel!", String.class);
         assertNotNull(response);
-
         assertEquals("Hello Camel!", response);
 
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:myapp");
-        mockEndpoint.expectedHeaderReceived(Exchange.HTTP_METHOD, "GET");
-        LOG.debug("Number of exchanges in mock:myapp" + 
mockEndpoint.getExchanges().size());
+        mockEndpoint.assertIsSatisfied();
 
         for (Exchange exchange : mockEndpoint.getExchanges()) {
             assertEquals("Hello Camel! Bye Camel!", 
exchange.getIn().getBody(String.class));
         }
-
-        mockEndpoint.assertIsSatisfied();
-
     }
 
     @Override

Reply via email to