Updated Branches:
  refs/heads/master e7db4ab26 -> 64675d495

CAMEL-6165: New camel-stomp component. Thanks to Dejan Bosanac for the 
contribution.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2beac53a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2beac53a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2beac53a

Branch: refs/heads/master
Commit: 2beac53a41575cffce21967dd209154de6868a33
Parents: d936737
Author: Claus Ibsen <[email protected]>
Authored: Wed Jul 31 11:09:33 2013 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Wed Jul 31 13:17:34 2013 +0200

----------------------------------------------------------------------
 components/camel-stomp/README.md                |  20 +++
 components/camel-stomp/pom.xml                  |  89 ++++++++++
 .../camel/component/stomp/StompComponent.java   |  56 +++++++
 .../component/stomp/StompConfiguration.java     |  61 +++++++
 .../camel/component/stomp/StompConsumer.java    |  60 +++++++
 .../camel/component/stomp/StompEndpoint.java    | 162 +++++++++++++++++++
 .../camel/component/stomp/StompProducer.java    |  43 +++++
 .../services/org/apache/camel/component/stomp   |  18 +++
 .../camel/component/stomp/StompBaseTest.java    |  46 ++++++
 .../component/stomp/StompConsumerTest.java      |  64 ++++++++
 .../component/stomp/StompProducerTest.java      |  83 ++++++++++
 .../src/test/resources/log4j.properties         |  40 +++++
 parent/pom.xml                                  |   1 +
 13 files changed, 743 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/README.md
----------------------------------------------------------------------
diff --git a/components/camel-stomp/README.md b/components/camel-stomp/README.md
new file mode 100644
index 0000000..8341d09
--- /dev/null
+++ b/components/camel-stomp/README.md
@@ -0,0 +1,20 @@
+camel-stomp
+===========
+
+Camel component used for communicating with [Stomp] (http://stomp.github.io/) 
compliant message brokers, like [Apache ActiveMQ](http://activemq.apache.org) 
or [ActiveMQ Apollo](http://activemq.apache.org/apollo/).
+
+URI format
+----------
+
+    stomp:destination
+
+Where destination is broker specific. With ActiveMQ you can use queues and 
topics in the form of
+
+    stomp:queue:test
+
+Samples
+-------
+
+    from("direct:foo").to("stomp:queue:test")
+
+

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-stomp/pom.xml b/components/camel-stomp/pom.xml
new file mode 100644
index 0000000..c81fd89
--- /dev/null
+++ b/components/camel-stomp/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.12-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-stomp</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: Stomp</name>
+  <description>Camel Stomp client</description>
+
+  <properties>
+    
<camel.osgi.export.pkg>org.apache.camel.component.stomp.*</camel.osgi.export.pkg>
+    
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=stomp</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.stompjms</groupId>
+      <artifactId>stompjms-client</artifactId>
+      <version>${stompjms-version}</version>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-stomp</artifactId>
+      <version>${activemq-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+      <version>${xstream-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
new file mode 100644
index 0000000..165c24e
--- /dev/null
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class StompComponent extends DefaultComponent {
+
+    StompConfiguration configuration = new StompConfiguration();
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        String destination = "/" + remaining.replaceAll(":", "/");
+
+        StompEndpoint endpoint = new StompEndpoint(uri, this, 
getConfiguration(), destination);
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+
+    public StompConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(StompConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        getConfiguration().setBrokerURL(brokerURL);
+    }
+
+    public void setLogin(String login) {
+        getConfiguration().setLogin(login);
+    }
+
+    public void setPasscode(String passcode) {
+        getConfiguration().setPasscode(passcode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
new file mode 100644
index 0000000..664215b
--- /dev/null
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.fusesource.stomp.client.Stomp;
+
+public class StompConfiguration {
+
+    private String brokerURL = "tcp://localhost:61613";
+    private String login;
+    private String passcode;
+
+    private Stomp stomp;
+
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    public String getLogin() {
+        return login;
+    }
+
+    public void setLogin(String login) {
+        this.login = login;
+    }
+
+    public String getPasscode() {
+        return passcode;
+    }
+
+    public void setPasscode(String passcode) {
+        this.passcode = passcode;
+    }
+
+    public Stomp getStomp() throws Exception {
+        if (stomp == null) {
+            stomp = new Stomp(brokerURL);
+            stomp.setLogin(login);
+            stomp.setPasscode(passcode);
+        }
+        return stomp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
new file mode 100644
index 0000000..a405e40
--- /dev/null
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConsumer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+public class StompConsumer extends DefaultConsumer {
+
+    AsciiBuffer id;
+
+    public StompConsumer(Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        id = getEndpoint().getNextId();
+    }
+
+    @Override
+    public StompEndpoint getEndpoint() {
+        return (StompEndpoint) super.getEndpoint();
+    }
+
+    protected void doStart() throws Exception {
+        getEndpoint().addConsumer(this);
+        super.doStart();
+    }
+
+    protected void doStop() throws Exception {
+        getEndpoint().removeConsumer(this);
+        super.doStop();
+    }
+
+    void processExchange(Exchange exchange) {
+        try {
+            getProcessor().process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing 
exchange.", exchange, exchange.getException());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
new file mode 100644
index 0000000..623dea0
--- /dev/null
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.stomp.client.Callback;
+import org.fusesource.stomp.client.CallbackConnection;
+import org.fusesource.stomp.client.Promise;
+import org.fusesource.stomp.codec.StompFrame;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.DISCONNECT;
+import static org.fusesource.stomp.client.Constants.ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+import static org.fusesource.stomp.client.Constants.SUBSCRIBE;
+import static org.fusesource.stomp.client.Constants.UNSUBSCRIBE;
+
+public class StompEndpoint extends DefaultEndpoint {
+
+    private CallbackConnection connection;
+    private StompConfiguration configuration;
+    private String destination;
+
+    private final List<StompConsumer> consumers = new 
CopyOnWriteArrayList<StompConsumer>();
+
+    public StompEndpoint(String uri, StompComponent component, 
StompConfiguration configuration, String destination) {
+        super(uri, component);
+        this.configuration = configuration;
+        this.destination = destination;
+    }
+
+    public Producer createProducer() throws Exception {
+        return new StompProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new StompConsumer(this, processor);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        final Promise<CallbackConnection> promise = new 
Promise<CallbackConnection>();
+
+        configuration.getStomp().connectCallback(promise);
+
+        connection = promise.await();
+
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.receive(new Callback<StompFrame>() {
+                    @Override
+                    public void onFailure(Throwable value) {
+                        if (started.get()) {
+                            connection.close(null);
+                        }
+                    }
+
+                    @Override
+                    public void onSuccess(StompFrame value) {
+                        if (!consumers.isEmpty()) {
+                            Exchange exchange = createExchange();
+                            exchange.getIn().setBody(value.content());
+                            for (StompConsumer consumer : consumers) {
+                                consumer.processExchange(exchange);
+                            }
+                        }
+                    }
+                });
+                connection.resume();
+            }
+        });
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(DISCONNECT);
+                connection.send(frame, null);
+            }
+        });
+        connection.close(null);
+    }
+
+    protected void send(Message message) {
+        final StompFrame frame = new StompFrame(SEND);
+        frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination));
+        frame.content(utf8(message.getBody().toString()));
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.send(frame, null);
+            }
+        });
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return super.createEndpointUri();
+    }
+
+    void addConsumer(final StompConsumer consumer) {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(SUBSCRIBE);
+                frame.addHeader(DESTINATION, 
StompFrame.encodeHeader(destination));
+                frame.addHeader(ID, consumer.id);
+                connection.send(frame, null);
+            }
+        });
+        consumers.add(consumer);
+    }
+
+    void removeConsumer(final StompConsumer consumer) {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                StompFrame frame = new StompFrame(UNSUBSCRIBE);
+                frame.addHeader(DESTINATION, 
StompFrame.encodeHeader(destination));
+                frame.addHeader(ID, consumer.id);
+                connection.send(frame, null);
+            }
+        });
+        consumers.remove(consumer);
+    }
+
+    AsciiBuffer getNextId() {
+        return connection.nextId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
new file mode 100644
index 0000000..74b8b63
--- /dev/null
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultAsyncProducer;
+
+public class StompProducer extends DefaultAsyncProducer implements Processor {
+
+    private final StompEndpoint stompEndpoint;
+
+    public StompProducer(StompEndpoint stompEndpoint) {
+        super(stompEndpoint);
+        this.stompEndpoint = stompEndpoint;
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            stompEndpoint.send(exchange.getIn());
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        callback.done(true);
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
 
b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
new file mode 100644
index 0000000..ae04da8
--- /dev/null
+++ 
b/components/camel-stomp/src/main/resources/META-INF/services/org/apache/camel/component/stomp
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.stomp.StompComponent
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
new file mode 100644
index 0000000..052ab75
--- /dev/null
+++ 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+public abstract class StompBaseTest extends CamelTestSupport {
+
+    protected BrokerService brokerService;
+    protected int numberOfMessages = 100;
+
+    @Override
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.addConnector("stomp://localhost:61613?trace=true");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
new file mode 100644
index 0000000..a2def8c
--- /dev/null
+++ 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
+import static org.fusesource.stomp.client.Constants.SEND;
+
+public class StompConsumerTest extends StompBaseTest {
+
+    @Test
+    public void testConsume() throws Exception {
+        Stomp stomp = new Stomp("tcp://localhost:61613");
+        final BlockingConnection producerConnection = stomp.connectBlocking();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(numberOfMessages);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            StompFrame frame = new StompFrame(SEND);
+            frame.addHeader(DESTINATION, 
StompFrame.encodeHeader("/queue/test"));
+            frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i));
+            frame.content(utf8("Important Message " + i));
+            producerConnection.send(frame);
+        }
+
+        mock.await(5, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("stomp:queue:test")
+                        .transform(body().convertToString())
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
new file mode 100644
index 0000000..fc878ea
--- /dev/null
+++ 
b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompProducerTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stomp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.stomp.client.BlockingConnection;
+import org.fusesource.stomp.client.Stomp;
+import org.fusesource.stomp.codec.StompFrame;
+import org.junit.Test;
+
+import static org.fusesource.stomp.client.Constants.DESTINATION;
+import static org.fusesource.stomp.client.Constants.ID;
+import static org.fusesource.stomp.client.Constants.SUBSCRIBE;
+
+public class StompProducerTest extends StompBaseTest {
+
+    @Test
+    public void testProduce() throws Exception {
+        Stomp stomp = new Stomp("tcp://localhost:61613");
+        final BlockingConnection subscribeConnection = stomp.connectBlocking();
+
+        StompFrame frame = new StompFrame(SUBSCRIBE);
+        frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test"));
+        frame.addHeader(ID, subscribeConnection.nextId());
+        StompFrame response = subscribeConnection.request(frame);
+
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++) {
+                    try {
+                        StompFrame frame = subscribeConnection.receive();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        Producer producer = context.getEndpoint("direct:foo").createProducer();
+        for (int i = 0; i < numberOfMessages; i++) {
+            Exchange exchange = producer.createExchange();
+            exchange.getIn().setBody("test message " + i);
+            producer.process(exchange);
+        }
+        latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), 
latch.getCount() == 0);
+
+
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:foo").to("stomp:queue:test");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/components/camel-stomp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-stomp/src/test/resources/log4j.properties 
b/components/camel-stomp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..55f9354
--- /dev/null
+++ b/components/camel-stomp/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=info, file
+
+#log4j.logger.twitter4j=DEBUG
+#log4j.logger.org.apache.camel.component.stomp=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
+
+#log4j.logger.org.apache.activemq.transport.stomp=TRACE, out
+#log4j.additivity.org.apache.activemq.transport.stomp=false
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
+log4j.appender.file.file=target/camel-stomp-test.log
+log4j.appender.file.append=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2beac53a/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index cba4ea5..ce8e05b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -367,6 +367,7 @@
     <spymemcached-bundle-version>2.5_2</spymemcached-bundle-version> <!-- 
FIXME cmueller: not in sync! -->
     <spymemcached-version>2.8.4</spymemcached-version>
     <sshd-version>0.8.0</sshd-version>
+    <stompjms-version>1.13</stompjms-version>
     <stax-api-version>1.0.1</stax-api-version>
     <stax2-api-bundle-version>3.1.1</stax2-api-bundle-version>
     <stringtemplate-bundle-version>4.0.2_2</stringtemplate-bundle-version>

Reply via email to