Repository: karaf-decanter Updated Branches: refs/heads/master 93d29f040 -> 7f5cbf44e
[KARAF-4799] Add text message support in JMS collector, and corresponding test Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/f1409497 Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/f1409497 Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/f1409497 Branch: refs/heads/master Commit: f140949774db111fbf0022a7a10fe51bb24aee2f Parents: 93d29f0 Author: Jean-Baptiste Onofré <[email protected]> Authored: Fri Oct 21 07:35:50 2016 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Fri Oct 21 08:03:46 2016 +0200 ---------------------------------------------------------------------- collector/jms/pom.xml | 29 ++- .../decanter/collector/jms/JmsCollector.java | 113 +++++++--- .../collector/jms/JmsCollectorTest.java | 209 +++++++++++++++++++ 3 files changed, 317 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/pom.xml ---------------------------------------------------------------------- diff --git a/collector/jms/pom.xml b/collector/jms/pom.xml index ec45abc..3a15212 100644 --- a/collector/jms/pom.xml +++ b/collector/jms/pom.xml @@ -35,12 +35,39 @@ <dependencies> <dependency> + <groupId>org.apache.karaf.decanter</groupId> + <artifactId>org.apache.karaf.decanter.api</artifactId> + </dependency> + <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1.1</version> </dependency> - </dependencies> + <!-- test --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.karaf.decanter.marshaller</groupId> + <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java ---------------------------------------------------------------------- diff --git a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java index 103908b..ee1e5b8 100644 --- a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java +++ b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java @@ -16,6 +16,7 @@ */ package org.apache.karaf.decanter.collector.jms; +import org.apache.karaf.decanter.api.marshaller.Unmarshaller; import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -28,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; +import java.io.ByteArrayInputStream; import java.net.InetAddress; import java.util.Dictionary; import java.util.Enumeration; @@ -51,6 +53,7 @@ public class JmsCollector { private String destinationType; private EventAdmin dispatcher; + private Unmarshaller unmarshaller; private Connection connection; private Session session; @@ -67,7 +70,7 @@ public class JmsCollector { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = createDestination(session); MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new DecanterMessageListener(dispatcher)); + consumer.setMessageListener(new DecanterMessageListener(dispatcher, unmarshaller)); connection.start(); } @@ -123,56 +126,100 @@ public class JmsCollector { this.dispatcher = dispatcher; } + @Reference + public void setUnmarshaller(Unmarshaller unmarshaller) { + this.unmarshaller = unmarshaller; + } + public class DecanterMessageListener implements MessageListener { private EventAdmin dispatcher; + private Unmarshaller unmarshaller; - public DecanterMessageListener(EventAdmin dispatcher) { + public DecanterMessageListener(EventAdmin dispatcher, Unmarshaller unmarshaller) { this.dispatcher = dispatcher; + this.unmarshaller = unmarshaller; } @Override public void onMessage(Message message) { - if (!(message instanceof MapMessage)) { - LOGGER.warn("JMS is not a MapMessage."); + if (!(message instanceof MapMessage) && !(message instanceof TextMessage)) { + LOGGER.warn("JMS is not a MapMessage or a TextMessage."); return; } - MapMessage mapMessage = (MapMessage) message; - - try { - Map<String, Object> data = new HashMap<>(); + if (message instanceof MapMessage) { + MapMessage mapMessage = (MapMessage) message; try { - data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); - data.put("hostName", InetAddress.getLocalHost().getHostName()); + Map<String, Object> data = new HashMap<>(); + + try { + data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); + data.put("hostName", InetAddress.getLocalHost().getHostName()); + } catch (Exception e) { + LOGGER.warn("Can't populate local host name and address", e); + } + + // custom fields + Enumeration<String> keys = properties.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + data.put(key, properties.get(key)); + } + + Enumeration names = mapMessage.getMapNames(); + while (names.hasMoreElements()) { + String name = (String) names.nextElement(); + data.put(name, mapMessage.getObject(name)); + } + + data.put("type", "jms"); + String karafName = System.getProperty("karaf.name"); + if (karafName != null) { + data.put("karafName", karafName); + } + + Event event = new Event(dispatcherTopic, data); + dispatcher.postEvent(event); } catch (Exception e) { - LOGGER.warn("Can't populate local host name and address", e); - } - - // custom fields - Enumeration<String> keys = properties.keys(); - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - data.put(key, properties.get(key)); - } - - Enumeration names = mapMessage.getMapNames(); - while (names.hasMoreElements()) { - String name = (String) names.nextElement(); - data.put(name, mapMessage.getObject(name)); + LOGGER.warn("Can't process JMS message", e); } + } + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; - data.put("type", "jms"); - String karafName = System.getProperty("karaf.name"); - if (karafName != null) { - data.put("karafName", karafName); + try { + Map<String, Object> data = new HashMap<>(); + + try { + data.put("hostAddress", InetAddress.getLocalHost().getHostAddress()); + data.put("hostName", InetAddress.getLocalHost().getHostName()); + } catch (Exception e) { + LOGGER.warn("Can't populate local host name and address", e); + } + + // custom fields + Enumeration<String> keys = properties.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + data.put(key, properties.get(key)); + } + + ByteArrayInputStream is = new ByteArrayInputStream(textMessage.getText().getBytes()); + data.putAll(unmarshaller.unmarshal(is)); + + data.put("type", "jms"); + String karafName = System.getProperty("karaf.name"); + if (karafName != null) { + data.put("karafName", karafName); + } + + Event event = new Event(dispatcherTopic, data); + dispatcher.postEvent(event); + } catch (Exception e) { + LOGGER.warn("Can't process JMS message", e); } - - Event event = new Event(dispatcherTopic, data); - dispatcher.postEvent(event); - } catch (Exception e) { - LOGGER.warn("Can't process JMS message", e); } } http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java ---------------------------------------------------------------------- diff --git a/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java new file mode 100644 index 0000000..a5e019d --- /dev/null +++ b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.decanter.collector.jms; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller; +import org.junit.*; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.ComponentInstance; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.List; +import java.util.Properties; + +public class JmsCollectorTest { + + private static BrokerService broker; + + private DispatcherMock dispatcher; + private ActiveMQConnectionFactory connectionFactory; + + @BeforeClass + public static void startBroker() throws Exception { + broker = new BrokerService(); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + broker.addConnector("tcp://localhost:61616"); + broker.setUseJmx(false); + broker.start(); + } + + @Before + public void setup() throws Exception { + connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL("tcp://localhost:61616"); + + JsonUnmarshaller unmarshaller = new JsonUnmarshaller(); + + dispatcher = new DispatcherMock(); + + JmsCollector jmsCollector = new JmsCollector(); + jmsCollector.setUnmarshaller(unmarshaller); + jmsCollector.setConnectionFactory(connectionFactory); + jmsCollector.setDispatcher(dispatcher); + + ComponentContext componentContext = new ComponentContextMock(); + componentContext.getProperties().put("destination.name", "decanter"); + componentContext.getProperties().put("destination.type", "queue"); + + jmsCollector.activate(componentContext); + } + + @Test + public void test() throws Exception { + Connection connection = null; + Session session = null; + try { + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("decanter")); + ActiveMQMapMessage mapMessage = new ActiveMQMapMessage(); + mapMessage.setString("message", "map"); + producer.send(mapMessage); + + Thread.sleep(200L); + + Assert.assertEquals(1, dispatcher.getPostEvents().size()); + Event event = dispatcher.getPostEvents().get(0); + Assert.assertEquals("map", event.getProperty("message")); + Assert.assertEquals("jms", event.getProperty("type")); + + ActiveMQTextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText("{ \"message\" : \"text\" }"); + producer.send(textMessage); + + Thread.sleep(200L); + + Assert.assertEquals(2, dispatcher.getPostEvents().size()); + event = dispatcher.getPostEvents().get(1); + Assert.assertEquals("text", event.getProperty("message")); + Assert.assertEquals("jms", event.getProperty("type")); + } finally { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } + } + + @AfterClass + public static void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private class ComponentContextMock implements ComponentContext { + + private Properties properties; + + public ComponentContextMock() { + this.properties = new Properties(); + } + + @Override + public Dictionary getProperties() { + return this.properties; + } + + @Override + public Object locateService(String s) { + return null; + } + + @Override + public Object locateService(String s, ServiceReference serviceReference) { + return null; + } + + @Override + public Object[] locateServices(String s) { + return new Object[0]; + } + + @Override + public BundleContext getBundleContext() { + return null; + } + + @Override + public Bundle getUsingBundle() { + return null; + } + + @Override + public ComponentInstance getComponentInstance() { + return null; + } + + @Override + public void enableComponent(String s) { + + } + + @Override + public void disableComponent(String s) { + + } + + @Override + public ServiceReference getServiceReference() { + return null; + } + } + + private class DispatcherMock implements EventAdmin { + + private List<Event> postEvents = new ArrayList<>(); + private List<Event> sendEvents = new ArrayList<>(); + + @Override + public void postEvent(Event event) { + postEvents.add(event); + } + + @Override + public void sendEvent(Event event) { + sendEvents.add(event); + } + + public List<Event> getPostEvents() { + return postEvents; + } + + public List<Event> getSendEvents() { + return sendEvents; + } + } + +}
