Author: davsclaus
Date: Sat Oct  1 15:51:45 2011
New Revision: 1178034

URL: http://svn.apache.org/viewvc?rev=1178034&view=rev
Log:
CAMEL-3824: Headers is now included in the ServerMessage. Thanks to Joshua for 
the patch.

Added:
    
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
   (with props)
Modified:
    
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
    
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
    
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
    
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java

Added: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java?rev=1178034&view=auto
==============================================================================
--- 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
 (added)
+++ 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
 Sat Oct  1 15:51:45 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cometd;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.cometd.bayeux.server.ServerChannel;
+import org.cometd.bayeux.server.ServerMessage;
+import org.cometd.bayeux.server.ServerSession;
+import org.cometd.server.BayeuxServerImpl;
+
+/**
+ * A Strategy used to convert between a Camel {@link Exchange} and
+ * to and from a Cometd messages
+ */
+public class CometdBinding {
+    public static final String HEADERS_FIELD = "CamelHeaders";
+    public static final String COMETD_CLIENT_ID_HEADER_NAME = "CometdClientId";
+
+    private final BayeuxServerImpl bayeux;
+
+    public CometdBinding(BayeuxServerImpl bayeux) {
+        this.bayeux = bayeux;
+    }
+
+    public ServerMessage.Mutable createCometdMessage(ServerChannel channel, 
ServerSession serverSession, Message camelMessage) {
+        ServerMessage.Mutable mutable = bayeux.newMessage();
+        mutable.setChannel(channel.getId());
+
+        if (serverSession != null) {
+            mutable.setClientId(serverSession.getId());
+        }
+        addHeadersToMessage(mutable, camelMessage);
+        mutable.setData(camelMessage.getBody());
+        return mutable;
+    }
+
+    public Message createCamelMessage(ServerSession remote, ServerMessage 
cometdMessage, Object data) {
+        if (cometdMessage != null) {
+            data = cometdMessage.getData();
+        }
+
+        Message message = new DefaultMessage();
+        message.setBody(data);
+        message.setHeaders(getHeadersFromMessage(cometdMessage));
+        message.setHeader(COMETD_CLIENT_ID_HEADER_NAME, remote.getId());
+        return message;
+    }
+
+
+    public void addHeadersToMessage(ServerMessage.Mutable cometdMessage, 
Message camelMessage) {
+        if (camelMessage.hasHeaders()) {
+            Map<String, Object> ext = cometdMessage.getExt(true);
+
+            ext.put(HEADERS_FIELD, filterHeaders(camelMessage.getHeaders()));
+        }
+    }
+
+    //TODO: do something in the style of JMS where they have header Strategies?
+    private Object filterHeaders(Map<String, Object> headers) {
+        Map<String, Object> map = new HashMap<String, Object>();
+        for (Entry<String, Object> entry : headers.entrySet()) {
+            if (entry != null && entry.getKey() != null) {
+                map.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return map;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> getHeadersFromMessage(ServerMessage message) {
+        Map<String, Object> ext = message.getExt();
+        if (ext != null && ext.containsKey(HEADERS_FIELD)) {
+            return (Map<String, Object>) ext.get(HEADERS_FIELD);
+        } else {
+            return null;
+        }
+    }
+
+}

Propchange: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
 (original)
+++ 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
 Sat Oct  1 15:51:45 2011
@@ -41,7 +41,6 @@ import org.eclipse.jetty.util.resource.R
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Component for Jetty Cometd
  */

Modified: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
 (original)
+++ 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
 Sat Oct  1 15:51:45 2011
@@ -20,8 +20,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ExchangeHelper;
+import org.cometd.bayeux.server.ServerChannel;
 import org.cometd.bayeux.server.ServerMessage;
 import org.cometd.bayeux.server.ServerSession;
 import org.cometd.server.AbstractService;
@@ -71,9 +71,11 @@ public class CometdConsumer extends Defa
 
         private final CometdEndpoint endpoint;
         private final CometdConsumer consumer;
+        private final CometdBinding binding;
 
         public ConsumerService(String channel, BayeuxServerImpl bayeux, 
CometdConsumer consumer) {
             super(bayeux, channel);
+            this.binding = new CometdBinding(bayeux);
             this.consumer = consumer;
             this.endpoint = consumer.getEndpoint();
             addService(channel, "push");
@@ -82,12 +84,7 @@ public class CometdConsumer extends Defa
         public void push(ServerSession remote, String channelName, 
ServerMessage cometdMessage, String messageId) throws Exception {
             Object data = null;
 
-            if (cometdMessage != null) {
-                data = cometdMessage.getData();
-            }
-
-            Message message = new DefaultMessage();
-            message.setBody(data);
+            Message message = binding.createCamelMessage(remote, 
cometdMessage, data);
 
             Exchange exchange = endpoint.createExchange();
             exchange.setIn(message);
@@ -95,10 +92,14 @@ public class CometdConsumer extends Defa
             consumer.getProcessor().process(exchange);
 
             if (ExchangeHelper.isOutCapable(exchange)) {
-                Message camelOutMessage = exchange.getOut();
-                remote.deliver(getServerSession(), channelName, 
camelOutMessage.getBody(), null);
+                ServerChannel channel = getBayeux().getChannel(channelName);
+                ServerSession serverSession = getServerSession();
+
+                ServerMessage.Mutable outMessage = 
binding.createCometdMessage(channel, serverSession, exchange.getOut());
+                remote.deliver(serverSession, outMessage);
             }
         }
+
     }
 
 }
\ No newline at end of file

Modified: 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
 (original)
+++ 
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
 Sat Oct  1 15:51:45 2011
@@ -20,6 +20,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.cometd.bayeux.server.BayeuxServer;
 import org.cometd.bayeux.server.ServerChannel;
+import org.cometd.bayeux.server.ServerMessage;
 import org.cometd.bayeux.server.ServerSession;
 import org.cometd.server.AbstractService;
 import org.cometd.server.BayeuxServerImpl;
@@ -46,7 +47,7 @@ public class CometdProducer extends Defa
         super.start();
         // must connect first
         endpoint.connect(this);
-        service = new ProducerService(getBayeux(), endpoint.getPath(), this);
+        service = new ProducerService(getBayeux(), new CometdBinding(bayeux), 
endpoint.getPath(), this);
     }
 
     @Override
@@ -74,10 +75,12 @@ public class CometdProducer extends Defa
     public static class ProducerService extends AbstractService {
 
         private final CometdProducer producer;
+        private final CometdBinding binding;
 
-        public ProducerService(BayeuxServer bayeux, String channel, 
CometdProducer producer) {
+        public ProducerService(BayeuxServer bayeux, CometdBinding 
cometdBinding, String channel, CometdProducer producer) {
             super(bayeux, channel);
             this.producer = producer;
+            this.binding = cometdBinding;
         }
 
         public void process(final Exchange exchange) {
@@ -88,7 +91,8 @@ public class CometdProducer extends Defa
 
             if (channel != null) {
                 logDelivery(exchange, channel);
-                channel.publish(serverSession, exchange.getIn().getBody(), 
null);
+                ServerMessage.Mutable mutable = 
binding.createCometdMessage(channel, serverSession, exchange.getIn());
+                channel.publish(serverSession, mutable);
             }
         }
 

Modified: 
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
--- 
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
 Sat Oct  1 15:51:45 2011
@@ -19,6 +19,7 @@ package org.apache.camel.component.comet
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.AvailablePortFinder;
@@ -37,22 +38,46 @@ public class CometdProducerConsumerTest 
     @Test
     public void testProducer() throws Exception {
         Person person = new Person("David", "Greco");
-        template.requestBody("direct:input", person);
+        //act
+        template.requestBodyAndHeader("direct:input", person, "testHeading", 
"value");
+
+        //assert
         MockEndpoint ep = (MockEndpoint) context.getEndpoint("mock:test");
         List<Exchange> exchanges = ep.getReceivedExchanges();
         for (Exchange exchange : exchanges) {
-            Person person1 = (Person) exchange.getIn().getBody();
+            Message message = exchange.getIn();
+            Person person1 = (Person) message.getBody();
             assertEquals("David", person1.getName());
             assertEquals("Greco", person1.getSurname());
         }
     }
 
+    @Test
+    public void testHeadersSupported() throws Exception {
+        //setup
+        String headerName = "testHeading";
+        String headerValue = "value";
+
+        //act
+        template.requestBodyAndHeader("direct:input", "message", headerName, 
headerValue);
+
+        //assert
+        MockEndpoint ep = (MockEndpoint) context.getEndpoint("mock:test");
+        List<Exchange> exchanges = ep.getReceivedExchanges();
+        assertTrue(exchanges.size() > 0);
+        for (Exchange exchange : exchanges) {
+            Message message = exchange.getIn();
+            assertEquals(headerValue, message.getHeader(headerName));
+            
assertNotNull(message.getHeader(CometdBinding.COMETD_CLIENT_ID_HEADER_NAME));
+        }
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {
         port = AvailablePortFinder.getNextAvailable(23500);
         uri = "cometd://127.0.0.1:" + port + 
"/service/test?baseResource=file:./target/test-classes/webapp&"
-            + 
"timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2";
+                + 
"timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2";
 
         super.setUp();
     }


Reply via email to