Repository: karaf-decanter
Updated Branches:
  refs/heads/master 93d29f040 -> 7f5cbf44e


[KARAF-4799] Add text message support in JMS collector, and corresponding test


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/f1409497
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/f1409497
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/f1409497

Branch: refs/heads/master
Commit: f140949774db111fbf0022a7a10fe51bb24aee2f
Parents: 93d29f0
Author: Jean-Baptiste Onofré <[email protected]>
Authored: Fri Oct 21 07:35:50 2016 +0200
Committer: Jean-Baptiste Onofré <[email protected]>
Committed: Fri Oct 21 08:03:46 2016 +0200

----------------------------------------------------------------------
 collector/jms/pom.xml                           |  29 ++-
 .../decanter/collector/jms/JmsCollector.java    | 113 +++++++---
 .../collector/jms/JmsCollectorTest.java         | 209 +++++++++++++++++++
 3 files changed, 317 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/pom.xml
----------------------------------------------------------------------
diff --git a/collector/jms/pom.xml b/collector/jms/pom.xml
index ec45abc..3a15212 100644
--- a/collector/jms/pom.xml
+++ b/collector/jms/pom.xml
@@ -35,12 +35,39 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.karaf.decanter</groupId>
+            <artifactId>org.apache.karaf.decanter.api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
             <version>1.1.1</version>
         </dependency>
-    </dependencies>
 
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.marshaller</groupId>
+            <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
----------------------------------------------------------------------
diff --git 
a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
 
b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
index 103908b..ee1e5b8 100644
--- 
a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
+++ 
b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.decanter.collector.jms;
 
+import org.apache.karaf.decanter.api.marshaller.Unmarshaller;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
+import java.io.ByteArrayInputStream;
 import java.net.InetAddress;
 import java.util.Dictionary;
 import java.util.Enumeration;
@@ -51,6 +53,7 @@ public class JmsCollector {
     private String destinationType;
 
     private EventAdmin dispatcher;
+    private Unmarshaller unmarshaller;
     private Connection connection;
     private Session session;
 
@@ -67,7 +70,7 @@ public class JmsCollector {
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = createDestination(session);
         MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new DecanterMessageListener(dispatcher));
+        consumer.setMessageListener(new DecanterMessageListener(dispatcher, 
unmarshaller));
         connection.start();
     }
 
@@ -123,56 +126,100 @@ public class JmsCollector {
         this.dispatcher = dispatcher;
     }
 
+    @Reference
+    public void setUnmarshaller(Unmarshaller unmarshaller) {
+        this.unmarshaller = unmarshaller;
+    }
+
     public class DecanterMessageListener implements MessageListener {
 
         private EventAdmin dispatcher;
+        private Unmarshaller unmarshaller;
 
-        public DecanterMessageListener(EventAdmin dispatcher) {
+        public DecanterMessageListener(EventAdmin dispatcher, Unmarshaller 
unmarshaller) {
             this.dispatcher = dispatcher;
+            this.unmarshaller = unmarshaller;
         }
 
         @Override
         public void onMessage(Message message) {
-            if (!(message instanceof MapMessage)) {
-                LOGGER.warn("JMS is not a MapMessage.");
+            if (!(message instanceof MapMessage) && !(message instanceof 
TextMessage)) {
+                LOGGER.warn("JMS is not a MapMessage or a TextMessage.");
                 return;
             }
 
-            MapMessage mapMessage = (MapMessage) message;
-
-            try {
-                Map<String, Object> data = new HashMap<>();
+            if (message instanceof MapMessage) {
+                MapMessage mapMessage = (MapMessage) message;
 
                 try {
-                    data.put("hostAddress", 
InetAddress.getLocalHost().getHostAddress());
-                    data.put("hostName", 
InetAddress.getLocalHost().getHostName());
+                    Map<String, Object> data = new HashMap<>();
+
+                    try {
+                        data.put("hostAddress", 
InetAddress.getLocalHost().getHostAddress());
+                        data.put("hostName", 
InetAddress.getLocalHost().getHostName());
+                    } catch (Exception e) {
+                        LOGGER.warn("Can't populate local host name and 
address", e);
+                    }
+
+                    // custom fields
+                    Enumeration<String> keys = properties.keys();
+                    while (keys.hasMoreElements()) {
+                        String key = keys.nextElement();
+                        data.put(key, properties.get(key));
+                    }
+
+                    Enumeration names = mapMessage.getMapNames();
+                    while (names.hasMoreElements()) {
+                        String name = (String) names.nextElement();
+                        data.put(name, mapMessage.getObject(name));
+                    }
+
+                    data.put("type", "jms");
+                    String karafName = System.getProperty("karaf.name");
+                    if (karafName != null) {
+                        data.put("karafName", karafName);
+                    }
+
+                    Event event = new Event(dispatcherTopic, data);
+                    dispatcher.postEvent(event);
                 } catch (Exception e) {
-                    LOGGER.warn("Can't populate local host name and address", 
e);
-                }
-
-                // custom fields
-                Enumeration<String> keys = properties.keys();
-                while (keys.hasMoreElements()) {
-                    String key = keys.nextElement();
-                    data.put(key, properties.get(key));
-                }
-
-                Enumeration names = mapMessage.getMapNames();
-                while (names.hasMoreElements()) {
-                    String name = (String) names.nextElement();
-                    data.put(name, mapMessage.getObject(name));
+                    LOGGER.warn("Can't process JMS message", e);
                 }
+            }
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
 
-                data.put("type", "jms");
-                String karafName = System.getProperty("karaf.name");
-                if (karafName != null) {
-                    data.put("karafName", karafName);
+                try {
+                    Map<String, Object> data = new HashMap<>();
+
+                    try {
+                        data.put("hostAddress", 
InetAddress.getLocalHost().getHostAddress());
+                        data.put("hostName", 
InetAddress.getLocalHost().getHostName());
+                    } catch (Exception e) {
+                        LOGGER.warn("Can't populate local host name and 
address", e);
+                    }
+
+                    // custom fields
+                    Enumeration<String> keys = properties.keys();
+                    while (keys.hasMoreElements()) {
+                        String key = keys.nextElement();
+                        data.put(key, properties.get(key));
+                    }
+
+                    ByteArrayInputStream is = new 
ByteArrayInputStream(textMessage.getText().getBytes());
+                    data.putAll(unmarshaller.unmarshal(is));
+
+                    data.put("type", "jms");
+                    String karafName = System.getProperty("karaf.name");
+                    if (karafName != null) {
+                        data.put("karafName", karafName);
+                    }
+
+                    Event event = new Event(dispatcherTopic, data);
+                    dispatcher.postEvent(event);
+                } catch (Exception e) {
+                    LOGGER.warn("Can't process JMS message", e);
                 }
-
-                Event event = new Event(dispatcherTopic, data);
-                dispatcher.postEvent(event);
-            } catch (Exception e) {
-                LOGGER.warn("Can't process JMS message", e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
 
b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
new file mode 100644
index 0000000..a5e019d
--- /dev/null
+++ 
b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.jms;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller;
+import org.junit.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Properties;
+
+public class JmsCollectorTest {
+
+    private static BrokerService broker;
+
+    private DispatcherMock dispatcher;
+    private ActiveMQConnectionFactory connectionFactory;
+
+    @BeforeClass
+    public static void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+        broker.addConnector("tcp://localhost:61616");
+        broker.setUseJmx(false);
+        broker.start();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        connectionFactory = new ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL("tcp://localhost:61616");
+
+        JsonUnmarshaller unmarshaller = new JsonUnmarshaller();
+
+        dispatcher = new DispatcherMock();
+
+        JmsCollector jmsCollector = new JmsCollector();
+        jmsCollector.setUnmarshaller(unmarshaller);
+        jmsCollector.setConnectionFactory(connectionFactory);
+        jmsCollector.setDispatcher(dispatcher);
+
+        ComponentContext componentContext = new ComponentContextMock();
+        componentContext.getProperties().put("destination.name", "decanter");
+        componentContext.getProperties().put("destination.type", "queue");
+
+        jmsCollector.activate(componentContext);
+    }
+
+    @Test
+    public void test() throws Exception {
+        Connection connection = null;
+        Session session = null;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = 
session.createProducer(session.createQueue("decanter"));
+            ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
+            mapMessage.setString("message", "map");
+            producer.send(mapMessage);
+
+            Thread.sleep(200L);
+
+            Assert.assertEquals(1, dispatcher.getPostEvents().size());
+            Event event = dispatcher.getPostEvents().get(0);
+            Assert.assertEquals("map", event.getProperty("message"));
+            Assert.assertEquals("jms", event.getProperty("type"));
+
+            ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
+            textMessage.setText("{ \"message\" : \"text\" }");
+            producer.send(textMessage);
+
+            Thread.sleep(200L);
+
+            Assert.assertEquals(2, dispatcher.getPostEvents().size());
+            event = dispatcher.getPostEvents().get(1);
+            Assert.assertEquals("text", event.getProperty("message"));
+            Assert.assertEquals("jms", event.getProperty("type"));
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    @AfterClass
+    public static void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private class ComponentContextMock implements ComponentContext {
+
+        private Properties properties;
+
+        public ComponentContextMock() {
+            this.properties = new Properties();
+        }
+
+        @Override
+        public Dictionary getProperties() {
+            return this.properties;
+        }
+
+        @Override
+        public Object locateService(String s) {
+            return null;
+        }
+
+        @Override
+        public Object locateService(String s, ServiceReference 
serviceReference) {
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String s) {
+            return new Object[0];
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String s) {
+
+        }
+
+        @Override
+        public void disableComponent(String s) {
+
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            return null;
+        }
+    }
+
+    private class DispatcherMock implements EventAdmin {
+
+        private List<Event> postEvents = new ArrayList<>();
+        private List<Event> sendEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sendEvents.add(event);
+        }
+
+        public List<Event> getPostEvents() {
+            return postEvents;
+        }
+
+        public List<Event> getSendEvents() {
+            return sendEvents;
+        }
+    }
+
+}

Reply via email to