Author: davsclaus
Date: Sat Oct 1 15:51:45 2011
New Revision: 1178034
URL: http://svn.apache.org/viewvc?rev=1178034&view=rev
Log:
CAMEL-3824: Headers is now included in the ServerMessage. Thanks to Joshua for
the patch.
Added:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
(with props)
Modified:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
Added:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java?rev=1178034&view=auto
==============================================================================
---
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
(added)
+++
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
Sat Oct 1 15:51:45 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cometd;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.cometd.bayeux.server.ServerChannel;
+import org.cometd.bayeux.server.ServerMessage;
+import org.cometd.bayeux.server.ServerSession;
+import org.cometd.server.BayeuxServerImpl;
+
+/**
+ * A Strategy used to convert between a Camel {@link Exchange} and
+ * to and from a Cometd messages
+ */
+public class CometdBinding {
+ public static final String HEADERS_FIELD = "CamelHeaders";
+ public static final String COMETD_CLIENT_ID_HEADER_NAME = "CometdClientId";
+
+ private final BayeuxServerImpl bayeux;
+
+ public CometdBinding(BayeuxServerImpl bayeux) {
+ this.bayeux = bayeux;
+ }
+
+ public ServerMessage.Mutable createCometdMessage(ServerChannel channel,
ServerSession serverSession, Message camelMessage) {
+ ServerMessage.Mutable mutable = bayeux.newMessage();
+ mutable.setChannel(channel.getId());
+
+ if (serverSession != null) {
+ mutable.setClientId(serverSession.getId());
+ }
+ addHeadersToMessage(mutable, camelMessage);
+ mutable.setData(camelMessage.getBody());
+ return mutable;
+ }
+
+ public Message createCamelMessage(ServerSession remote, ServerMessage
cometdMessage, Object data) {
+ if (cometdMessage != null) {
+ data = cometdMessage.getData();
+ }
+
+ Message message = new DefaultMessage();
+ message.setBody(data);
+ message.setHeaders(getHeadersFromMessage(cometdMessage));
+ message.setHeader(COMETD_CLIENT_ID_HEADER_NAME, remote.getId());
+ return message;
+ }
+
+
+ public void addHeadersToMessage(ServerMessage.Mutable cometdMessage,
Message camelMessage) {
+ if (camelMessage.hasHeaders()) {
+ Map<String, Object> ext = cometdMessage.getExt(true);
+
+ ext.put(HEADERS_FIELD, filterHeaders(camelMessage.getHeaders()));
+ }
+ }
+
+ //TODO: do something in the style of JMS where they have header Strategies?
+ private Object filterHeaders(Map<String, Object> headers) {
+ Map<String, Object> map = new HashMap<String, Object>();
+ for (Entry<String, Object> entry : headers.entrySet()) {
+ if (entry != null && entry.getKey() != null) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> getHeadersFromMessage(ServerMessage message) {
+ Map<String, Object> ext = message.getExt();
+ if (ext != null && ext.containsKey(HEADERS_FIELD)) {
+ return (Map<String, Object>) ext.get(HEADERS_FIELD);
+ } else {
+ return null;
+ }
+ }
+
+}
Propchange:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdBinding.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
---
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
(original)
+++
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
Sat Oct 1 15:51:45 2011
@@ -41,7 +41,6 @@ import org.eclipse.jetty.util.resource.R
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Component for Jetty Cometd
*/
Modified:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
---
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
(original)
+++
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
Sat Oct 1 15:51:45 2011
@@ -20,8 +20,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ExchangeHelper;
+import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
@@ -71,9 +71,11 @@ public class CometdConsumer extends Defa
private final CometdEndpoint endpoint;
private final CometdConsumer consumer;
+ private final CometdBinding binding;
public ConsumerService(String channel, BayeuxServerImpl bayeux,
CometdConsumer consumer) {
super(bayeux, channel);
+ this.binding = new CometdBinding(bayeux);
this.consumer = consumer;
this.endpoint = consumer.getEndpoint();
addService(channel, "push");
@@ -82,12 +84,7 @@ public class CometdConsumer extends Defa
public void push(ServerSession remote, String channelName,
ServerMessage cometdMessage, String messageId) throws Exception {
Object data = null;
- if (cometdMessage != null) {
- data = cometdMessage.getData();
- }
-
- Message message = new DefaultMessage();
- message.setBody(data);
+ Message message = binding.createCamelMessage(remote,
cometdMessage, data);
Exchange exchange = endpoint.createExchange();
exchange.setIn(message);
@@ -95,10 +92,14 @@ public class CometdConsumer extends Defa
consumer.getProcessor().process(exchange);
if (ExchangeHelper.isOutCapable(exchange)) {
- Message camelOutMessage = exchange.getOut();
- remote.deliver(getServerSession(), channelName,
camelOutMessage.getBody(), null);
+ ServerChannel channel = getBayeux().getChannel(channelName);
+ ServerSession serverSession = getServerSession();
+
+ ServerMessage.Mutable outMessage =
binding.createCometdMessage(channel, serverSession, exchange.getOut());
+ remote.deliver(serverSession, outMessage);
}
}
+
}
}
\ No newline at end of file
Modified:
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
---
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
(original)
+++
camel/trunk/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdProducer.java
Sat Oct 1 15:51:45 2011
@@ -20,6 +20,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ServerChannel;
+import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.BayeuxServerImpl;
@@ -46,7 +47,7 @@ public class CometdProducer extends Defa
super.start();
// must connect first
endpoint.connect(this);
- service = new ProducerService(getBayeux(), endpoint.getPath(), this);
+ service = new ProducerService(getBayeux(), new CometdBinding(bayeux),
endpoint.getPath(), this);
}
@Override
@@ -74,10 +75,12 @@ public class CometdProducer extends Defa
public static class ProducerService extends AbstractService {
private final CometdProducer producer;
+ private final CometdBinding binding;
- public ProducerService(BayeuxServer bayeux, String channel,
CometdProducer producer) {
+ public ProducerService(BayeuxServer bayeux, CometdBinding
cometdBinding, String channel, CometdProducer producer) {
super(bayeux, channel);
this.producer = producer;
+ this.binding = cometdBinding;
}
public void process(final Exchange exchange) {
@@ -88,7 +91,8 @@ public class CometdProducer extends Defa
if (channel != null) {
logDelivery(exchange, channel);
- channel.publish(serverSession, exchange.getIn().getBody(),
null);
+ ServerMessage.Mutable mutable =
binding.createCometdMessage(channel, serverSession, exchange.getIn());
+ channel.publish(serverSession, mutable);
}
}
Modified:
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java?rev=1178034&r1=1178033&r2=1178034&view=diff
==============================================================================
---
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
(original)
+++
camel/trunk/components/camel-cometd/src/test/java/org/apache/camel/component/cometd/CometdProducerConsumerTest.java
Sat Oct 1 15:51:45 2011
@@ -19,6 +19,7 @@ package org.apache.camel.component.comet
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.AvailablePortFinder;
@@ -37,22 +38,46 @@ public class CometdProducerConsumerTest
@Test
public void testProducer() throws Exception {
Person person = new Person("David", "Greco");
- template.requestBody("direct:input", person);
+ //act
+ template.requestBodyAndHeader("direct:input", person, "testHeading",
"value");
+
+ //assert
MockEndpoint ep = (MockEndpoint) context.getEndpoint("mock:test");
List<Exchange> exchanges = ep.getReceivedExchanges();
for (Exchange exchange : exchanges) {
- Person person1 = (Person) exchange.getIn().getBody();
+ Message message = exchange.getIn();
+ Person person1 = (Person) message.getBody();
assertEquals("David", person1.getName());
assertEquals("Greco", person1.getSurname());
}
}
+ @Test
+ public void testHeadersSupported() throws Exception {
+ //setup
+ String headerName = "testHeading";
+ String headerValue = "value";
+
+ //act
+ template.requestBodyAndHeader("direct:input", "message", headerName,
headerValue);
+
+ //assert
+ MockEndpoint ep = (MockEndpoint) context.getEndpoint("mock:test");
+ List<Exchange> exchanges = ep.getReceivedExchanges();
+ assertTrue(exchanges.size() > 0);
+ for (Exchange exchange : exchanges) {
+ Message message = exchange.getIn();
+ assertEquals(headerValue, message.getHeader(headerName));
+
assertNotNull(message.getHeader(CometdBinding.COMETD_CLIENT_ID_HEADER_NAME));
+ }
+ }
+
@Override
@Before
public void setUp() throws Exception {
port = AvailablePortFinder.getNextAvailable(23500);
uri = "cometd://127.0.0.1:" + port +
"/service/test?baseResource=file:./target/test-classes/webapp&"
- +
"timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2";
+ +
"timeout=240000&interval=0&maxInterval=30000&multiFrameInterval=1500&jsonCommented=true&logLevel=2";
super.setUp();
}