http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java b/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java new file mode 100644 index 0000000..cc46434 --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import org.apache.karaf.jms.JmsMBean; +import org.apache.karaf.jms.JmsMessage; +import org.apache.karaf.jms.JmsService; + +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.List; +import java.util.Map; + +/** + * Default implementation of the JMS MBean. + */ +public class JmsMBeanImpl implements JmsMBean { + + private JmsService jmsService; + + @Override + public List<String> getConnectionfactories() throws MBeanException { + try { + return jmsService.connectionFactories(); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void create(String name, String type, String url) throws MBeanException { + try { + jmsService.create(name, type, url); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void delete(String name) throws MBeanException { + try { + jmsService.delete(name); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.info(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int count(String connectionFactory, String queue, String username, String password) throws MBeanException { + try { + return jmsService.count(connectionFactory, queue, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public List<String> queues(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.queues(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public List<String> topics(String connectionFactory, String username, String password) throws MBeanException { + try { + return jmsService.topics(connectionFactory, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException { + try { + jmsService.send(connectionFactory, queue, content, replyTo, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { + try { + return jmsService.consume(connectionFactory, queue, selector, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException { + try { + return jmsService.move(connectionFactory, source, destination, selector, username, password); + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + @Override + public TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { + try { + CompositeType type = new CompositeType("message", "JMS Message", + new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, + new String[]{ "Message ID", "Content", "Charset", "Type", "Correlation ID", "Delivery Mode", "Destination", "Expiration Date", "Priority", "Redelivered", "Reply-To", "Timestamp" }, + new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.STRING }); + TabularType tableType = new TabularType("messages", "JMS Messages", type, new String[]{ "id" }); + TabularData table = new TabularDataSupport(tableType); + for (JmsMessage message : getJmsService().browse(connectionFactory, queue, selector, username, password)) { + CompositeData data = new CompositeDataSupport(type, + new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, + new Object[]{ message.getMessageId(), message.getContent(), message.getCharset(), message.getType(), message.getCorrelationID(), message.getDeliveryMode(), message.getDestination(), message.getExpiration(), message.getPriority(), message.isRedelivered(), message.getReplyTo(), message.getTimestamp() } + ); + table.put(data); + } + return table; + } catch (Throwable t) { + throw new MBeanException(null, t.getMessage()); + } + } + + public JmsService getJmsService() { + return jmsService; + } + + public void setJmsService(JmsService jmsService) { + this.jmsService = jmsService; + } + +}
http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java new file mode 100644 index 0000000..fe4b43d --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.pool.PooledConnection; +import org.apache.karaf.jms.JmsMessage; +import org.apache.karaf.jms.JmsService; +import org.apache.karaf.util.TemplateUtils; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceReference; + +import javax.jms.*; + +import java.io.*; +import java.lang.IllegalStateException; +import java.util.*; + +/** + * Default implementation of the JMS Service. + */ +public class JmsServiceImpl implements JmsService { + + private BundleContext bundleContext; + private File deployFolder; + + public JmsServiceImpl() { + File karafBase = new File(System.getProperty("karaf.base")); + deployFolder = new File(karafBase, "deploy"); + } + + @Override + public void create(String name, String type, String url) throws Exception { + if (!type.equalsIgnoreCase("activemq") && !type.equalsIgnoreCase("webspheremq")) { + throw new IllegalArgumentException("JMS connection factory type not known"); + } + + File outFile = getConnectionFactoryFile(name); + String template; + HashMap<String, String> properties = new HashMap<String, String>(); + properties.put("name", name); + + if (type.equalsIgnoreCase("activemq")) { + // activemq + properties.put("url", url); + template = "connectionfactory-activemq.xml"; + } else { + // webspheremq + String[] splitted = url.split("/"); + if (splitted.length != 4) { + throw new IllegalStateException("WebsphereMQ URI should be in the following format: host/port/queuemanager/channel"); + } + + properties.put("host", splitted[0]); + properties.put("port", splitted[1]); + properties.put("queuemanager", splitted[2]); + properties.put("channel", splitted[3]); + template = "connectionfactory-webspheremq.xml"; + } + InputStream is = this.getClass().getResourceAsStream(template); + if (is == null) { + throw new IllegalArgumentException("Template resource " + template + " doesn't exist"); + } + TemplateUtils.createFromTemplate(outFile, is, properties); + } + + private File getConnectionFactoryFile(String name) { + return new File(deployFolder, "connectionfactory-" + name + ".xml"); + } + + @Override + public void delete(String name) throws Exception { + File connectionFactoryFile = getConnectionFactoryFile(name); + if (!connectionFactoryFile.exists()) { + throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile.getPath() + " doesn't exist"); + } + connectionFactoryFile.delete(); + } + + @SuppressWarnings("rawtypes") + @Override + public List<String> connectionFactories() throws Exception { + List<String> connectionFactories = new ArrayList<String>(); + ServiceReference[] references = bundleContext.getServiceReferences(ConnectionFactory.class.getName(), null); + if (references != null) { + for (ServiceReference reference : references) { + if (reference.getProperty("osgi.jndi.service.name") != null) { + connectionFactories.add((String) reference.getProperty("osgi.jndi.service.name")); + } else if (reference.getProperty("name") != null) { + connectionFactories.add((String) reference.getProperty("name")); + } else { + connectionFactories.add(reference.getProperty(Constants.SERVICE_ID).toString()); + } + } + } + return connectionFactories; + } + + @Override + public List<String> connectionFactoryFileNames() throws Exception { + String[] connectionFactoryFileNames = deployFolder.list(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + return name.startsWith("connectionfactory-") && name.endsWith(".xml"); + } + }); + + return Arrays.asList(connectionFactoryFileNames); + } + + @Override + public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + ConnectionMetaData metaData = connector.connect().getMetaData(); + Map<String, String> map = new HashMap<String, String>(); + map.put("product", metaData.getJMSProviderName()); + map.put("version", metaData.getProviderVersion()); + return map; + } finally { + connector.close(); + } + } + + @SuppressWarnings("unchecked") + @Override + public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + Session session = connector.createSession(); + QueueBrowser browser = session.createBrowser(session.createQueue(destination)); + Enumeration<Message> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + enumeration.nextElement(); + count++; + } + browser.close(); + return count; + } finally { + connector.close(); + } + } + + private DestinationSource getDestinationSource(Connection connection) throws JMSException { + if (connection instanceof PooledConnection) { + connection = ((PooledConnection) connection).getConnection(); + } + if (connection instanceof ActiveMQConnection) { + return ((ActiveMQConnection) connection).getDestinationSource(); + } else { + return null; + } + } + + @Override + public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + List<String> queues = new ArrayList<String>(); + DestinationSource destinationSource = getDestinationSource(connector.connect()); + if (destinationSource != null) { + Set<ActiveMQQueue> activeMQQueues = destinationSource.getQueues(); + for (ActiveMQQueue activeMQQueue : activeMQQueues) { + queues.add(activeMQQueue.getQueueName()); + } + } + return queues; + } finally { + connector.close(); + } + } + + @Override + public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + DestinationSource destinationSource = getDestinationSource(connector.connect()); + List<String> topics = new ArrayList<String>(); + if (destinationSource != null) { + Set<ActiveMQTopic> activeMQTopics = destinationSource.getTopics(); + for (ActiveMQTopic activeMQTopic : activeMQTopics) { + topics.add(activeMQTopic.getTopicName()); + } + } + return topics; + } finally { + connector.close(); + } + } + + @SuppressWarnings("unchecked") + @Override + public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, + String username, String password) throws JMSException, IOException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + List<JmsMessage> messages = new ArrayList<JmsMessage>(); + Session session = connector.createSession(); + QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter); + Enumeration<Message> enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + Message message = enumeration.nextElement(); + + messages.add(new JmsMessage(message)); + } + browser.close(); + return messages; + } finally { + connector.close(); + } + } + + @Override + public void send(String connectionFactory, final String queue, final String body, final String replyTo, + String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + Session session = connector.createSession(); + Message message = session.createTextMessage(body); + if (replyTo != null) { + message.setJMSReplyTo(session.createQueue(replyTo)); + } + MessageProducer producer = session.createProducer(session.createQueue(queue)); + producer.send(message); + producer.close(); + } finally { + connector.close(); + } + } + + @Override + public int consume(String connectionFactory, final String queue, final String selector, String username, + String password) throws Exception { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + int count = 0; + Session session = connector.createSession(); + MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector); + Message message; + do { + message = consumer.receive(5000L); + if (message != null) { + count++; + } + } while (message != null); + return count; + } finally { + connector.close(); + } + } + + @Override + public int move(String connectionFactory, final String sourceQueue, final String targetQueue, + final String selector, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + int count = 0; + Session session = connector.createSession(); + MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector); + Message message; + do { + message = consumer.receive(5000L); + if (message != null) { + MessageProducer producer = session.createProducer(session.createQueue(targetQueue)); + producer.send(message); + count++; + } + } while (message != null); + consumer.close(); + return count; + } finally { + connector.close(); + } + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/resources/OSGI-INF/blueprint/jms-core.xml ---------------------------------------------------------------------- diff --git a/jms/src/main/resources/OSGI-INF/blueprint/jms-core.xml b/jms/src/main/resources/OSGI-INF/blueprint/jms-core.xml new file mode 100644 index 0000000..7dd8070 --- /dev/null +++ b/jms/src/main/resources/OSGI-INF/blueprint/jms-core.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by + applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the + License. + --> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0" + default-activation="lazy"> + + <ext:property-placeholder placeholder-prefix="$[" placeholder-suffix="]"/> + + <bean id="jmsService" class="org.apache.karaf.jms.internal.JmsServiceImpl"> + <property name="bundleContext" ref="blueprintBundleContext"/> + </bean> + + <service ref="jmsService" interface="org.apache.karaf.jms.JmsService" /> + + <!-- Management --> + <bean id="jmsMBeanImpl" class="org.apache.karaf.jms.internal.JmsMBeanImpl"> + <property name="jmsService" ref="jmsService"/> + </bean> + + <service ref="jmsMBeanImpl" auto-export="interfaces"> + <service-properties> + <entry key="jmx.objectname" value="org.apache.karaf:type=jms,name=$[karaf.name]"/> + </service-properties> + </service> + +</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/resources/OSGI-INF/bundle.info ---------------------------------------------------------------------- diff --git a/jms/src/main/resources/OSGI-INF/bundle.info b/jms/src/main/resources/OSGI-INF/bundle.info new file mode 100644 index 0000000..1aeb646 --- /dev/null +++ b/jms/src/main/resources/OSGI-INF/bundle.info @@ -0,0 +1,18 @@ +h1. Synopsis + +${project.name} + +${project.description} + +Maven URL: +[mvn:${project.groupId}/${project.artifactId}/${project.version}] + +h1. Description + +This bundle is the core implementation of the JMS service support. + +The JMS service allows you to create connection factories, and send/browse/consume messages. + +h1. See also + +JMS - section of the Karaf User Guide http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml ---------------------------------------------------------------------- diff --git a/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml b/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml new file mode 100644 index 0000000..f0aabbd --- /dev/null +++ b/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by + applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the + License. + --> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> + + <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> + <property name="brokerURL" value="${url}" /> + </bean> + + <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> + <property name="maxConnections" value="8" /> + <property name="connectionFactory" ref="activemqConnectionFactory" /> + </bean> + + <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> + <property name="transactionManager" ref="transactionManager" /> + <property name="connectionFactory" ref="activemqConnectionFactory" /> + <property name="resourceName" value="activemq.localhost" /> + </bean> + + <reference id="transactionManager" interface="javax.transaction.TransactionManager" /> + + <service ref="pooledConnectionFactory" interface="javax.jms.ConnectionFactory"> + <service-properties> + <entry key="name" value="${name}" /> + <entry key="osgi.jndi.service.name" value="jms/${name}" /> + </service-properties> + </service> + +</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/0f53437c/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml ---------------------------------------------------------------------- diff --git a/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml b/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml new file mode 100644 index 0000000..3123f49 --- /dev/null +++ b/jms/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by + applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the + License. + --> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> + + <bean id="wmqConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> + <property name="transportType" value="1" /> + <property name="hostName" value="${hostname}" /> + <property name="port" value="${port}" /> + <property name="queueManager" value="${queuemanager}" /> + <property name="channel" value="${channel}" /> + <property name="useConnectionPooling" value="true" /> + </bean> + + <service ref="wmqConnectionFactory" interface="javax.jms.ConnectionFactory"> + <service-properties> + <entry key="name" value="${name}"/> + <entry key="osgi.jndi.service.name" value="jms/${name}"/> + </service-properties> + </service> + +</blueprint> \ No newline at end of file