Updated Branches: refs/heads/master ce3dc06b6 -> e1b1c45c6
KARAF-2741 Replace JmsTemplate with JmsConnector approach Project: http://git-wip-us.apache.org/repos/asf/karaf/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf/commit/787a5509 Tree: http://git-wip-us.apache.org/repos/asf/karaf/tree/787a5509 Diff: http://git-wip-us.apache.org/repos/asf/karaf/diff/787a5509 Branch: refs/heads/master Commit: 787a5509903c6bacc102c8f604a3f1060b0b4b2b Parents: 47d745d Author: Christian Schneider <[email protected]> Authored: Wed Jan 29 13:31:33 2014 +0100 Committer: Christian Schneider <[email protected]> Committed: Wed Jan 29 13:31:33 2014 +0100 ---------------------------------------------------------------------- .../apache/karaf/jms/internal/JmsConnector.java | 99 +++++++ .../karaf/jms/internal/JmsServiceImpl.java | 263 +++++++++---------- .../apache/karaf/jms/internal/JmsTemplate.java | 97 ------- 3 files changed, 223 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf/blob/787a5509/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java new file mode 100644 index 0000000..ecace89 --- /dev/null +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms.internal; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; + +public class JmsConnector implements Closeable { + private BundleContext bc; + private ServiceReference<ConnectionFactory> reference; + private Connection connection; + private Session session; + private String connectionFactoryName; + private String username; + private String password; + + public JmsConnector(BundleContext bc, String connectionFactoryName, String username, String password) throws JMSException { + this.bc = bc; + this.connectionFactoryName = connectionFactoryName; + this.username = username; + this.password = password; + } + + private ServiceReference<ConnectionFactory> lookupConnectionFactory(String name) { + Collection<ServiceReference<ConnectionFactory>> references; + try { + references = bc.getServiceReferences(ConnectionFactory.class, "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); + } catch (InvalidSyntaxException e) { + throw new RuntimeException("Error finding connection factory service " + name, e); + } + if (references == null || references.size() == 0) { + throw new IllegalArgumentException("No JMS connection factory found for " + name); + } + if (references.size() > 1) { + throw new IllegalArgumentException("Multiple JMS connection factories found for " + name); + } + return references.iterator().next(); + } + + @Override + public void close() throws IOException { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + // Ignore + } + } + if (connection != null) { + try { + connection.close(); + } catch (JMSException e) { + // Ignore + } + } + if (reference != null) { + bc.ungetService(reference); + } + } + + public Connection connect() throws JMSException { + reference = this.lookupConnectionFactory(connectionFactoryName); + ConnectionFactory cf = (ConnectionFactory) bc.getService(reference); + connection = cf.createConnection(username, password); + connection.start(); + return connection; + } + + public Session createSession() throws JMSException { + if (connection == null) { + connect(); + } + return session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } +} http://git-wip-us.apache.org/repos/asf/karaf/blob/787a5509/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java index e4b5fc0..b8b0f30 100644 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java +++ b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java @@ -123,43 +123,37 @@ public class JmsServiceImpl implements JmsService { } @Override - public Map<String, String> info(String connectionFactory, String username, String password) throws Exception { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<Map<String, String>>() { - - @Override - public Map<String, String> doInSession(Connection connection, Session session) - throws JMSException { - ConnectionMetaData metaData = connection.getMetaData(); - Map<String, String> map = new HashMap<String, String>(); - map.put("product", metaData.getJMSProviderName()); - map.put("version", metaData.getProviderVersion()); - return map; - } - - }); + public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + ConnectionMetaData metaData = connector.connect().getMetaData(); + Map<String, String> map = new HashMap<String, String>(); + map.put("product", metaData.getJMSProviderName()); + map.put("version", metaData.getProviderVersion()); + return map; + } finally { + connector.close(); + } } + @SuppressWarnings("unchecked") @Override - public int count(String connectionFactory, final String destination, String username, String password) throws Exception { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<Integer>() { - - @SuppressWarnings("unchecked") - @Override - public Integer doInSession(Connection connection, Session session) throws JMSException { - QueueBrowser browser = session.createBrowser(session.createQueue(destination)); - Enumeration<Message> enumeration = browser.getEnumeration(); - int count = 0; - while (enumeration.hasMoreElements()) { - enumeration.nextElement(); - count++; - } - browser.close(); - return count; + public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + Session session = connector.createSession(); + QueueBrowser browser = session.createBrowser(session.createQueue(destination)); + Enumeration<Message> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + enumeration.nextElement(); + count++; } - - }); + browser.close(); + return count; + } finally { + connector.close(); + } } private DestinationSource getDestinationSource(Connection connection) throws JMSException { @@ -174,133 +168,124 @@ public class JmsServiceImpl implements JmsService { } @Override - public List<String> queues(String connectionFactory, String username, String password) { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<List<String>>() { - - @Override - public List<String> doInSession(Connection connection, Session session) throws JMSException { - List<String> queues = new ArrayList<String>(); - DestinationSource destinationSource = getDestinationSource(connection); - if (destinationSource != null) { - Set<ActiveMQQueue> activeMQQueues = destinationSource.getQueues(); - for (ActiveMQQueue activeMQQueue : activeMQQueues) { - queues.add(activeMQQueue.getQueueName()); - } + public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + List<String> queues = new ArrayList<String>(); + DestinationSource destinationSource = getDestinationSource(connector.connect()); + if (destinationSource != null) { + Set<ActiveMQQueue> activeMQQueues = destinationSource.getQueues(); + for (ActiveMQQueue activeMQQueue : activeMQQueues) { + queues.add(activeMQQueue.getQueueName()); } - return queues; } - }); + return queues; + } finally { + connector.close(); + } } @Override - public List<String> topics(String connectionFactory, String username, String password) { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<List<String>>() { - - @Override - public List<String> doInSession(Connection connection, Session session) throws JMSException { - DestinationSource destinationSource = getDestinationSource(connection); - List<String> topics = new ArrayList<String>(); - if (destinationSource != null) { - Set<ActiveMQTopic> activeMQTopics = destinationSource.getTopics(); - for (ActiveMQTopic activeMQTopic : activeMQTopics) { - topics.add(activeMQTopic.getTopicName()); - } + public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + DestinationSource destinationSource = getDestinationSource(connector.connect()); + List<String> topics = new ArrayList<String>(); + if (destinationSource != null) { + Set<ActiveMQTopic> activeMQTopics = destinationSource.getTopics(); + for (ActiveMQTopic activeMQTopic : activeMQTopics) { + topics.add(activeMQTopic.getTopicName()); } - return topics; } - }); + return topics; + } finally { + connector.close(); + } } + @SuppressWarnings("unchecked") @Override - public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, String username, String password) { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<List<JmsMessage>>() { - - @SuppressWarnings("unchecked") - @Override - public List<JmsMessage> doInSession(Connection connection, Session session) throws JMSException { - List<JmsMessage> messages = new ArrayList<JmsMessage>(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter); - Enumeration<Message> enumeration = browser.getEnumeration(); - while (enumeration.hasMoreElements()) { - Message message = enumeration.nextElement(); - - messages.add(new JmsMessage(message)); - } - browser.close(); - return messages; + public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, + String username, String password) throws JMSException, IOException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + List<JmsMessage> messages = new ArrayList<JmsMessage>(); + Session session = connector.createSession(); + QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter); + Enumeration<Message> enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + Message message = enumeration.nextElement(); + + messages.add(new JmsMessage(message)); } - - }); + browser.close(); + return messages; + } finally { + connector.close(); + } } @Override - public void send(String connectionFactory, final String queue, final String body, final String replyTo, String username, String password) { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - jmsTemplate.execute(new JmsCallback<Void>() { - - @Override - public Void doInSession(Connection connection, Session session) throws JMSException { - Message message = session.createTextMessage(body); - if (replyTo != null) { - message.setJMSReplyTo(session.createQueue(replyTo)); - } - MessageProducer producer = session.createProducer(session.createQueue(queue)); - producer.send(message); - producer.close(); - return null; + public void send(String connectionFactory, final String queue, final String body, final String replyTo, + String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + Session session = connector.createSession(); + Message message = session.createTextMessage(body); + if (replyTo != null) { + message.setJMSReplyTo(session.createQueue(replyTo)); } - }); + MessageProducer producer = session.createProducer(session.createQueue(queue)); + producer.send(message); + producer.close(); + } finally { + connector.close(); + } } @Override - public int consume(String connectionFactory, final String queue, final String selector, String username, String password) throws Exception { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<Integer>() { - - @Override - public Integer doInSession(Connection connection, Session session) throws JMSException { - int count = 0; - MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector); - Message message; - do { - message = consumer.receiveNoWait(); - if (message != null) { - count++; - } - } while (message != null); - return count; - } - - }); + public int consume(String connectionFactory, final String queue, final String selector, String username, + String password) throws Exception { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + int count = 0; + Session session = connector.createSession(); + MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector); + Message message; + do { + message = consumer.receiveNoWait(); + if (message != null) { + count++; + } + } while (message != null); + return count; + } finally { + connector.close(); + } } @Override - public int move(String connectionFactory, final String sourceQueue, final String targetQueue, final String selector, String username, String password) { - JmsTemplate jmsTemplate = new JmsTemplate(bundleContext, connectionFactory, username, password); - return jmsTemplate.execute(new JmsCallback<Integer>() { - - @Override - public Integer doInSession(Connection connection, Session session) throws JMSException { - int count = 0; - MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector); - Message message; - do { - message = consumer.receiveNoWait(); - if (message != null) { - MessageProducer producer = session.createProducer(session.createQueue(targetQueue)); - producer.send(message); - count++; - } - } while (message != null); - consumer.close(); - return count; - } - - }); + public int move(String connectionFactory, final String sourceQueue, final String targetQueue, + final String selector, String username, String password) throws IOException, JMSException { + JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password); + try { + int count = 0; + Session session = connector.createSession(); + MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector); + Message message; + do { + message = consumer.receiveNoWait(); + if (message != null) { + MessageProducer producer = session.createProducer(session.createQueue(targetQueue)); + producer.send(message); + count++; + } + } while (message != null); + consumer.close(); + return count; + } finally { + connector.close(); + } } public void setBundleContext(BundleContext bundleContext) { http://git-wip-us.apache.org/repos/asf/karaf/blob/787a5509/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsTemplate.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsTemplate.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsTemplate.java deleted file mode 100644 index 9206632..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsTemplate.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Session; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; - -public class JmsTemplate { - private BundleContext bc; - private String connectionFactoryName; - private String username; - private String password; - - public JmsTemplate(BundleContext bc, String connectionFactoryName, String username, String password) { - this.bc = bc; - this.connectionFactoryName = connectionFactoryName; - this.username = username; - this.password = password; - } - - @SuppressWarnings({ - "rawtypes", "unchecked" - }) - public <E> E execute(JmsCallback<E> callback) { - ServiceReference reference = null; - Connection connection = null; - Session session = null; - try { - reference = this.lookupConnectionFactory(connectionFactoryName); - ConnectionFactory cf = (ConnectionFactory) bc.getService(reference); - connection = cf.createConnection(username, password); - connection.start(); - session = connection.createSession(true, Session.SESSION_TRANSACTED); - E result = callback.doInSession(connection, session); - session.commit(); - return result; - } catch (Exception e) { - try { - if (session != null) { - session.rollback(); - } - } catch (JMSException e1) { - // Ignore - } - throw new RuntimeException(e.getMessage(), e); - } finally { - if (session != null) { - try { - session.close(); - } catch (JMSException e) { - // Ignore - } - } - if (connection != null) { - try { - connection.close(); - } catch (JMSException e) { - // Ignore - } - } - if (reference != null) { - bc.ungetService(reference); - } - } - } - - @SuppressWarnings("rawtypes") - private ServiceReference lookupConnectionFactory(String name) throws Exception { - ServiceReference[] references = bc.getServiceReferences(ConnectionFactory.class.getName(), "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); - if (references == null || references.length == 0) { - throw new IllegalArgumentException("No JMS connection factory found for " + name); - } - if (references.length > 1) { - throw new IllegalArgumentException("Multiple JMS connection factories found for " + name); - } - return references[0]; - } -}
