Updated Branches: refs/heads/master e7db4ab26 -> 64675d495
CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the contribution. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2beac53a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2beac53a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2beac53a Branch: refs/heads/master Commit: 2beac53a41575cffce21967dd209154de6868a33 Parents: d936737 Author: Claus Ibsen <[email protected]> Authored: Wed Jul 31 11:09:33 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Wed Jul 31 13:17:34 2013 +0200 ---------------------------------------------------------------------- components/camel-stomp/README.md | 20 +++ components/camel-stomp/pom.xml | 89 ++++++++++ .../camel/component/stomp/StompComponent.java | 56 +++++++ .../component/stomp/StompConfiguration.java | 61 +++++++ .../camel/component/stomp/StompConsumer.java | 60 +++++++ .../camel/component/stomp/StompEndpoint.java | 162 +++++++++++++++++++ .../camel/component/stomp/StompProducer.java | 43 +++++ .../services/org/apache/camel/component/stomp | 18 +++ .../camel/component/stomp/StompBaseTest.java | 46 ++++++ .../component/stomp/StompConsumerTest.java | 64 ++++++++ .../component/stomp/StompProducerTest.java | 83 ++++++++++ .../src/test/resources/log4j.properties | 40 +++++ parent/pom.xml | 1 + 13 files changed, 743 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/README.md ---------------------------------------------------------------------- diff --git a/components/camel-stomp/README.md b/components/camel-stomp/README.md new file mode 100644 index 0000000..8341d09 --- /dev/null +++ b/components/camel-stomp/README.md @@ -0,0 +1,20 @@ +camel-stomp +=========== + +Camel component used for communicating with [Stomp] (http://stomp.github.io/) compliant message brokers, like [Apache ActiveMQ](http://activemq.apache.org) or [ActiveMQ Apollo](http://activemq.apache.org/apollo/). + +URI format +---------- + + stomp:destination + +Where destination is broker specific. With ActiveMQ you can use queues and topics in the form of + + stomp:queue:test + +Samples +------- + + from("direct:foo").to("stomp:queue:test") + + http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-stomp/pom.xml b/components/camel-stomp/pom.xml new file mode 100644 index 0000000..c81fd89 --- /dev/null +++ b/components/camel-stomp/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.12-SNAPSHOT</version> + </parent> + + <artifactId>camel-stomp</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Stomp</name> + <description>Camel Stomp client</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.stomp.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=stomp</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.fusesource.stompjms</groupId> + <artifactId>stompjms-client</artifactId> + <version>${stompjms-version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-stomp</artifactId> + <version>${activemq-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>${xstream-version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java new file mode 100644 index 0000000..165c24e --- /dev/null +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +public class StompComponent extends DefaultComponent { + + StompConfiguration configuration = new StompConfiguration(); + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + String destination = "/" + remaining.replaceAll(":", "/"); + + StompEndpoint endpoint = new StompEndpoint(uri, this, getConfiguration(), destination); + setProperties(endpoint, parameters); + return endpoint; + } + + public StompConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(StompConfiguration configuration) { + this.configuration = configuration; + } + + public void setBrokerURL(String brokerURL) { + getConfiguration().setBrokerURL(brokerURL); + } + + public void setLogin(String login) { + getConfiguration().setLogin(login); + } + + public void setPasscode(String passcode) { + getConfiguration().setPasscode(passcode); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java new file mode 100644 index 0000000..664215b --- /dev/null +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import org.fusesource.stomp.client.Stomp; + +public class StompConfiguration { + + private String brokerURL = "tcp://localhost:61613"; + private String login; + private String passcode; + + private Stomp stomp; + + public String getBrokerURL() { + return brokerURL; + } + + public void setBrokerURL(String brokerURL) { + this.brokerURL = brokerURL; + } + + public String getLogin() { + return login; + } + + public void setLogin(String login) { + this.login = login; + } + + public String getPasscode() { + return passcode; + } + + public void setPasscode(String passcode) { + this.passcode = passcode; + } + + public Stomp getStomp() throws Exception { + if (stomp == null) { + stomp = new Stomp(brokerURL); + stomp.setLogin(login); + stomp.setPasscode(passcode); + } + return stomp; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java new file mode 100644 index 0000000..a405e40 --- /dev/null +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.fusesource.hawtbuf.AsciiBuffer; + +public class StompConsumer extends DefaultConsumer { + + AsciiBuffer id; + + public StompConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + id = getEndpoint().getNextId(); + } + + @Override + public StompEndpoint getEndpoint() { + return (StompEndpoint) super.getEndpoint(); + } + + protected void doStart() throws Exception { + getEndpoint().addConsumer(this); + super.doStart(); + } + + protected void doStop() throws Exception { + getEndpoint().removeConsumer(this); + super.doStop(); + } + + void processExchange(Exchange exchange) { + try { + getProcessor().process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange.", exchange, exchange.getException()); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java new file mode 100644 index 0000000..623dea0 --- /dev/null +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtdispatch.Task; +import org.fusesource.stomp.client.Callback; +import org.fusesource.stomp.client.CallbackConnection; +import org.fusesource.stomp.client.Promise; +import org.fusesource.stomp.codec.StompFrame; + +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.fusesource.stomp.client.Constants.DESTINATION; +import static org.fusesource.stomp.client.Constants.DISCONNECT; +import static org.fusesource.stomp.client.Constants.ID; +import static org.fusesource.stomp.client.Constants.SEND; +import static org.fusesource.stomp.client.Constants.SUBSCRIBE; +import static org.fusesource.stomp.client.Constants.UNSUBSCRIBE; + +public class StompEndpoint extends DefaultEndpoint { + + private CallbackConnection connection; + private StompConfiguration configuration; + private String destination; + + private final List<StompConsumer> consumers = new CopyOnWriteArrayList<StompConsumer>(); + + public StompEndpoint(String uri, StompComponent component, StompConfiguration configuration, String destination) { + super(uri, component); + this.configuration = configuration; + this.destination = destination; + } + + public Producer createProducer() throws Exception { + return new StompProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new StompConsumer(this, processor); + } + + public boolean isSingleton() { + return true; + } + + @Override + protected void doStart() throws Exception { + final Promise<CallbackConnection> promise = new Promise<CallbackConnection>(); + + configuration.getStomp().connectCallback(promise); + + connection = promise.await(); + + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.receive(new Callback<StompFrame>() { + @Override + public void onFailure(Throwable value) { + if (started.get()) { + connection.close(null); + } + } + + @Override + public void onSuccess(StompFrame value) { + if (!consumers.isEmpty()) { + Exchange exchange = createExchange(); + exchange.getIn().setBody(value.content()); + for (StompConsumer consumer : consumers) { + consumer.processExchange(exchange); + } + } + } + }); + connection.resume(); + } + }); + } + + @Override + protected void doStop() throws Exception { + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + StompFrame frame = new StompFrame(DISCONNECT); + connection.send(frame, null); + } + }); + connection.close(null); + } + + protected void send(Message message) { + final StompFrame frame = new StompFrame(SEND); + frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination)); + frame.content(utf8(message.getBody().toString())); + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.send(frame, null); + } + }); + } + + @Override + protected String createEndpointUri() { + return super.createEndpointUri(); + } + + void addConsumer(final StompConsumer consumer) { + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + StompFrame frame = new StompFrame(SUBSCRIBE); + frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination)); + frame.addHeader(ID, consumer.id); + connection.send(frame, null); + } + }); + consumers.add(consumer); + } + + void removeConsumer(final StompConsumer consumer) { + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + StompFrame frame = new StompFrame(UNSUBSCRIBE); + frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination)); + frame.addHeader(ID, consumer.id); + connection.send(frame, null); + } + }); + consumers.remove(consumer); + } + + AsciiBuffer getNextId() { + return connection.nextId(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java new file mode 100644 index 0000000..74b8b63 --- /dev/null +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultAsyncProducer; + +public class StompProducer extends DefaultAsyncProducer implements Processor { + + private final StompEndpoint stompEndpoint; + + public StompProducer(StompEndpoint stompEndpoint) { + super(stompEndpoint); + this.stompEndpoint = stompEndpoint; + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + stompEndpoint.send(exchange.getIn()); + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp new file mode 100644 index 0000000..ae04da8 --- /dev/null +++ b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.stomp.StompComponent \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java new file mode 100644 index 0000000..052ab75 --- /dev/null +++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import org.apache.activemq.broker.BrokerService; +import org.apache.camel.test.junit4.CamelTestSupport; + +public abstract class StompBaseTest extends CamelTestSupport { + + protected BrokerService brokerService; + protected int numberOfMessages = 100; + + @Override + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.addConnector("stomp://localhost:61613?trace=true"); + brokerService.start(); + brokerService.waitUntilStarted(); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java new file mode 100644 index 0000000..a2def8c --- /dev/null +++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.fusesource.stomp.client.BlockingConnection; +import org.fusesource.stomp.client.Stomp; +import org.fusesource.stomp.codec.StompFrame; +import org.junit.Test; + +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.fusesource.stomp.client.Constants.DESTINATION; +import static org.fusesource.stomp.client.Constants.MESSAGE_ID; +import static org.fusesource.stomp.client.Constants.SEND; + +public class StompConsumerTest extends StompBaseTest { + + @Test + public void testConsume() throws Exception { + Stomp stomp = new Stomp("tcp://localhost:61613"); + final BlockingConnection producerConnection = stomp.connectBlocking(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(numberOfMessages); + + for (int i = 0; i < numberOfMessages; i++) { + StompFrame frame = new StompFrame(SEND); + frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test")); + frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i)); + frame.content(utf8("Important Message " + i)); + producerConnection.send(frame); + } + + mock.await(5, TimeUnit.SECONDS); + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("stomp:queue:test") + .transform(body().convertToString()) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java new file mode 100644 index 0000000..fc878ea --- /dev/null +++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.fusesource.stomp.client.BlockingConnection; +import org.fusesource.stomp.client.Stomp; +import org.fusesource.stomp.codec.StompFrame; +import org.junit.Test; + +import static org.fusesource.stomp.client.Constants.DESTINATION; +import static org.fusesource.stomp.client.Constants.ID; +import static org.fusesource.stomp.client.Constants.SUBSCRIBE; + +public class StompProducerTest extends StompBaseTest { + + @Test + public void testProduce() throws Exception { + Stomp stomp = new Stomp("tcp://localhost:61613"); + final BlockingConnection subscribeConnection = stomp.connectBlocking(); + + StompFrame frame = new StompFrame(SUBSCRIBE); + frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test")); + frame.addHeader(ID, subscribeConnection.nextId()); + StompFrame response = subscribeConnection.request(frame); + + final CountDownLatch latch = new CountDownLatch(numberOfMessages); + + Thread thread = new Thread(new Runnable() { + public void run() { + for (int i = 0; i < numberOfMessages; i++) { + try { + StompFrame frame = subscribeConnection.receive(); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + }); + thread.start(); + + Producer producer = context.getEndpoint("direct:foo").createProducer(); + for (int i = 0; i < numberOfMessages; i++) { + Exchange exchange = producer.createExchange(); + exchange.getIn().setBody("test message " + i); + producer.process(exchange); + } + latch.await(20, TimeUnit.SECONDS); + assertTrue("Messages not consumed = " + latch.getCount(), latch.getCount() == 0); + + + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:foo").to("stomp:queue:test"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/resources/log4j.properties b/components/camel-stomp/src/test/resources/log4j.properties new file mode 100644 index 0000000..55f9354 --- /dev/null +++ b/components/camel-stomp/src/test/resources/log4j.properties @@ -0,0 +1,40 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=info, file + +#log4j.logger.twitter4j=DEBUG +#log4j.logger.org.apache.camel.component.stomp=DEBUG +#log4j.logger.org.apache.camel=DEBUG + +#log4j.logger.org.apache.activemq.transport.stomp=TRACE, out +#log4j.additivity.org.apache.activemq.transport.stomp=false + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.file.file=target/camel-stomp-test.log +log4j.appender.file.append=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index cba4ea5..ce8e05b 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -367,6 +367,7 @@ <spymemcached-bundle-version>2.5_2</spymemcached-bundle-version> <!-- FIXME cmueller: not in sync! --> <spymemcached-version>2.8.4</spymemcached-version> <sshd-version>0.8.0</sshd-version> + <stompjms-version>1.13</stompjms-version> <stax-api-version>1.0.1</stax-api-version> <stax2-api-bundle-version>3.1.1</stax2-api-bundle-version> <stringtemplate-bundle-version>4.0.2_2</stringtemplate-bundle-version>
