http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java deleted file mode 100644 index b3ea458..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/ArtemisDestinationSourceFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import org.apache.karaf.util.json.JsonReader; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueRequestor; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.io.StringReader; -import java.util.Collections; -import java.util.List; - -class ArtemisDestinationSourceFactory implements DestinationSource.Factory { - - @Override - public DestinationSource create(Connection connection) throws JMSException { - if (connection.getClass().getName().matches("org\\.apache\\.activemq\\.artemis\\.jms\\.client\\.ActiveMQ(XA)?Connection")) { - return type -> getNames(connection, type); - } - return null; - } - - private List<String> getNames(Connection connection, DestinationSource.DestinationType type) { - try { - QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - Queue managementQueue = session.createQueue("activemq.management"); - QueueRequestor requestor = new QueueRequestor(session, managementQueue); - connection.start(); - TextMessage m = session.createTextMessage(); - m.setStringProperty("_AMQ_ResourceName", "broker"); - m.setStringProperty("_AMQ_OperationName", "getQueueNames"); - String routing = type == DestinationSource.DestinationType.Queue ? "ANYCAST" : "MULTICAST"; - m.setText("[\"" + routing + "\"]"); - Message reply = requestor.request(m); - String json = ((TextMessage) reply).getText(); - List<?> array = (List<?>) JsonReader.read(new StringReader(json)); - return (List<String>) array.get(0); - } catch (Exception e) { - return Collections.emptyList(); - } - } -}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java deleted file mode 100644 index efc7bcd..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/DestinationSource.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import javax.jms.Connection; -import javax.jms.JMSException; -import java.util.List; - -interface DestinationSource { - - enum DestinationType { - Queue, Topic - } - - interface Factory { - - DestinationSource create(Connection connection) throws JMSException; - } - - List<String> getNames(DestinationType type); -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java deleted file mode 100644 index 75c52ce..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsConnector.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.Comparator; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Session; - -import org.osgi.framework.BundleContext; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceReference; - -public class JmsConnector implements Closeable { - private BundleContext bc; - private ServiceReference<ConnectionFactory> reference; - private Connection connection; - private Session session; - private String connectionFactoryName; - private String username; - private String password; - - public JmsConnector(BundleContext bc, String connectionFactoryName, String username, String password) throws JMSException { - this.bc = bc; - this.connectionFactoryName = connectionFactoryName; - this.username = username; - this.password = password; - } - - private ServiceReference<ConnectionFactory> lookupConnectionFactory(String name) { - try { - Collection<ServiceReference<ConnectionFactory>> references = bc.getServiceReferences( - ConnectionFactory.class, - "(|(osgi.jndi.service.name=" + name + ")(name=" + name + ")(service.id=" + name + "))"); - return references.stream() - .sorted(Comparator.<ServiceReference<?>>naturalOrder().reversed()) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("No JMS connection factory found for " + name)); - } catch (InvalidSyntaxException e) { - throw new RuntimeException("Error finding connection factory service " + name, e); - } - } - - @Override - public void close() throws IOException { - if (session != null) { - try { - session.close(); - } catch (JMSException e) { - // Ignore - } - } - if (connection != null) { - try { - connection.close(); - } catch (JMSException e) { - // Ignore - } - } - if (reference != null) { - bc.ungetService(reference); - } - } - - public Connection connect() throws JMSException { - reference = this.lookupConnectionFactory(connectionFactoryName); - ConnectionFactory cf = bc.getService(reference); - connection = cf.createConnection(username, password); - connection.start(); - return connection; - } - - public Session createSession() throws JMSException { - return createSession(Session.AUTO_ACKNOWLEDGE); - } - - public Session createSession(int acknowledgeMode) throws JMSException { - if (connection == null) { - connect(); - } - if (acknowledgeMode == Session.SESSION_TRANSACTED) { - session = connection.createSession(true, acknowledgeMode); - } else { - session = connection.createSession(false, acknowledgeMode); - } - return session; - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java deleted file mode 100644 index e3b7801..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsMBeanImpl.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import org.apache.karaf.jms.JmsMBean; -import org.apache.karaf.jms.JmsMessage; -import org.apache.karaf.jms.JmsService; - -import javax.management.MBeanException; -import javax.management.openmbean.*; -import java.util.List; -import java.util.Map; - -/** - * Default implementation of the JMS MBean. - */ -public class JmsMBeanImpl implements JmsMBean { - - private JmsService jmsService; - - @Override - public List<String> getConnectionfactories() throws MBeanException { - try { - return jmsService.connectionFactories(); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public void create(String name, String type, String url) throws MBeanException { - try { - jmsService.create(name, type, url); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public void create(String name, String type, String url, String username, String password) throws MBeanException { - try { - jmsService.create(name, type, url, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public void delete(String name) throws MBeanException { - try { - jmsService.delete(name); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException { - try { - return jmsService.info(connectionFactory, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public int count(String connectionFactory, String queue, String username, String password) throws MBeanException { - try { - return jmsService.count(connectionFactory, queue, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public List<String> queues(String connectionFactory, String username, String password) throws MBeanException { - try { - return jmsService.queues(connectionFactory, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public List<String> topics(String connectionFactory, String username, String password) throws MBeanException { - try { - return jmsService.topics(connectionFactory, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException { - try { - jmsService.send(connectionFactory, queue, content, replyTo, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { - try { - return jmsService.consume(connectionFactory, queue, selector, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException { - try { - return jmsService.move(connectionFactory, source, destination, selector, username, password); - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - @Override - public TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException { - try { - CompositeType type = new CompositeType("message", "JMS Message", - new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, - new String[]{ "Message ID", "Content", "Charset", "Type", "Correlation ID", "Delivery Mode", "Destination", "Expiration Date", "Priority", "Redelivered", "Reply-To", "Timestamp" }, - new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.STRING }); - TabularType tableType = new TabularType("messages", "JMS Messages", type, new String[]{ "id" }); - TabularData table = new TabularDataSupport(tableType); - for (JmsMessage message : getJmsService().browse(connectionFactory, queue, selector, username, password)) { - CompositeData data = new CompositeDataSupport(type, - new String[]{ "id", "content", "charset", "type", "correlation", "delivery", "destination", "expiration", "priority", "redelivered", "replyto", "timestamp" }, - new Object[]{ message.getMessageId(), message.getContent(), message.getCharset(), message.getType(), message.getCorrelationID(), message.getDeliveryMode(), message.getDestination(), message.getExpiration(), message.getPriority(), message.isRedelivered(), message.getReplyTo(), message.getTimestamp() } - ); - table.put(data); - } - return table; - } catch (Throwable t) { - throw new MBeanException(null, t.getMessage()); - } - } - - public JmsService getJmsService() { - return jmsService; - } - - public void setJmsService(JmsService jmsService) { - this.jmsService = jmsService; - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java deleted file mode 100644 index f460018..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/JmsServiceImpl.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.internal; - -import org.apache.karaf.jms.JmsMessage; -import org.apache.karaf.jms.JmsService; -import org.apache.karaf.util.TemplateUtils; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceReference; - -import javax.jms.*; - -import java.io.*; -import java.lang.IllegalStateException; -import java.lang.reflect.Method; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.*; -import java.util.stream.Collectors; - -/** - * Default implementation of the JMS Service. - */ -public class JmsServiceImpl implements JmsService { - - private BundleContext bundleContext; - private Path deployFolder; - - public JmsServiceImpl() { - deployFolder = Paths.get(System.getProperty("karaf.base"), "deploy"); - } - - @Override - public void create(String name, String type, String url) throws Exception { - create(name, type, url, null, null); - } - - @Override - public void create(String name, String type, String url, String username, String password) throws Exception { - if (!type.equalsIgnoreCase("activemq") - && !type.equalsIgnoreCase("artemis") - && !type.equalsIgnoreCase("webspheremq")) { - throw new IllegalArgumentException("JMS connection factory type not known"); - } - - Path outFile = getConnectionFactoryFile(name); - String template; - HashMap<String, String> properties = new HashMap<>(); - properties.put("name", name); - - if (type.equalsIgnoreCase("activemq")) { - // activemq - properties.put("url", url); - properties.put("username", username); - properties.put("password", password); - template = "connectionfactory-activemq.xml"; - } else if (type.equalsIgnoreCase("artemis")) { - // artemis - properties.put("url", url); - properties.put("username", username); - properties.put("password", password); - template = "connectionfactory-artemis.xml"; - } else { - // webspheremq - String[] splitted = url.split("/"); - if (splitted.length != 4) { - throw new IllegalStateException("WebsphereMQ URI should be in the following format: host/port/queuemanager/channel"); - } - - properties.put("host", splitted[0]); - properties.put("port", splitted[1]); - properties.put("queuemanager", splitted[2]); - properties.put("channel", splitted[3]); - template = "connectionfactory-webspheremq.xml"; - } - InputStream is = this.getClass().getResourceAsStream(template); - if (is == null) { - throw new IllegalArgumentException("Template resource " + template + " doesn't exist"); - } - TemplateUtils.createFromTemplate(outFile.toFile(), is, properties); - } - - private Path getConnectionFactoryFile(String name) { - return deployFolder.resolve("connectionfactory-" + name + ".xml"); - } - - @Override - public void delete(String name) throws Exception { - Path connectionFactoryFile = getConnectionFactoryFile(name); - if (!Files.isRegularFile(connectionFactoryFile)) { - throw new IllegalStateException("The JMS connection factory file " + connectionFactoryFile + " doesn't exist"); - } - Files.delete(connectionFactoryFile); - } - - @Override - public List<String> connectionFactories() throws Exception { - return bundleContext.getServiceReferences(ConnectionFactory.class, null).stream() - .map(this::getConnectionFactoryName) - .distinct() - .collect(Collectors.toList()); - } - - private String getConnectionFactoryName(ServiceReference<ConnectionFactory> reference) { - if (reference.getProperty("osgi.jndi.service.name") != null) { - return (String) reference.getProperty("osgi.jndi.service.name"); - } else if (reference.getProperty("name") != null) { - return (String) reference.getProperty("name"); - } else { - return reference.getProperty(Constants.SERVICE_ID).toString(); - } - } - - @Override - public List<String> connectionFactoryFileNames() throws Exception { - return Files.list(deployFolder) - .map(Path::getFileName) - .map(Path::toString) - .filter(name -> name.startsWith("connectionfactory-") && name.endsWith(".xml")) - .collect(Collectors.toList()); - } - - @Override - public Map<String, String> info(String connectionFactory, String username, String password) throws IOException, JMSException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - ConnectionMetaData metaData = connector.connect().getMetaData(); - Map<String, String> map = new HashMap<>(); - map.put("product", metaData.getJMSProviderName()); - map.put("version", metaData.getProviderVersion()); - return map; - } - } - - @Override - public int count(String connectionFactory, final String destination, String username, String password) throws IOException, JMSException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - Session session = connector.createSession(); - QueueBrowser browser = session.createBrowser(session.createQueue(destination)); - @SuppressWarnings("unchecked") - Enumeration<Message> enumeration = browser.getEnumeration(); - int count = 0; - while (enumeration.hasMoreElements()) { - enumeration.nextElement(); - count++; - } - browser.close(); - return count; - } - } - - private DestinationSource getDestinationSource(Connection connection) throws JMSException { - while (true) { - try { - Method mth = connection.getClass().getMethod("getConnection"); - connection = (Connection) mth.invoke(connection); - } catch (Throwable e) { - break; - } - } - List<DestinationSource.Factory> factories = Arrays.asList( - new ActiveMQDestinationSourceFactory(), - new ArtemisDestinationSourceFactory() - ); - DestinationSource source = null; - for (DestinationSource.Factory factory : factories) { - source = factory.create(connection); - if (source != null) { - break; - } - } - if (source == null) { - source = d -> Collections.emptyList(); - } - return source; - } - - @Override - public List<String> queues(String connectionFactory, String username, String password) throws JMSException, IOException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Queue); - } - } - - @Override - public List<String> topics(String connectionFactory, String username, String password) throws IOException, JMSException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - return getDestinationSource(connector.connect()).getNames(DestinationSource.DestinationType.Topic); - } - } - - @Override - public List<JmsMessage> browse(String connectionFactory, final String queue, final String filter, - String username, String password) throws JMSException, IOException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - List<JmsMessage> messages = new ArrayList<>(); - Session session = connector.createSession(); - QueueBrowser browser = session.createBrowser(session.createQueue(queue), filter); - @SuppressWarnings("unchecked") - Enumeration<Message> enumeration = browser.getEnumeration(); - while (enumeration.hasMoreElements()) { - Message message = enumeration.nextElement(); - - messages.add(new JmsMessage(message)); - } - browser.close(); - return messages; - } - } - - @Override - public void send(String connectionFactory, final String queue, final String body, final String replyTo, - String username, String password) throws IOException, JMSException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - Session session = connector.createSession(); - Message message = session.createTextMessage(body); - if (replyTo != null) { - message.setJMSReplyTo(session.createQueue(replyTo)); - } - MessageProducer producer = session.createProducer(session.createQueue(queue)); - producer.send(message); - producer.close(); - } - } - - @Override - public int consume(String connectionFactory, final String queue, final String selector, String username, - String password) throws Exception { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - int count = 0; - Session session = connector.createSession(); - MessageConsumer consumer = session.createConsumer(session.createQueue(queue), selector); - Message message; - do { - message = consumer.receive(500L); - if (message != null) { - count++; - } - } while (message != null); - return count; - } - } - - @Override - public int move(String connectionFactory, final String sourceQueue, final String targetQueue, - final String selector, String username, String password) throws IOException, JMSException { - try (JmsConnector connector = new JmsConnector(bundleContext, connectionFactory, username, password)) { - int count = 0; - Session session = connector.createSession(Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(session.createQueue(sourceQueue), selector); - Message message; - do { - message = consumer.receive(500L); - if (message != null) { - MessageProducer producer = session.createProducer(session.createQueue(targetQueue)); - producer.send(message); - count++; - } - } while (message != null); - session.commit(); - consumer.close(); - return count; - } - } - - public void setBundleContext(BundleContext bundleContext) { - this.bundleContext = bundleContext; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java ---------------------------------------------------------------------- diff --git a/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java b/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java deleted file mode 100644 index 1dd1c09..0000000 --- a/jms/core/src/main/java/org/apache/karaf/jms/internal/osgi/Activator.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.karaf.jms.internal.osgi; - -import org.apache.karaf.jms.JmsService; -import org.apache.karaf.jms.internal.JmsMBeanImpl; -import org.apache.karaf.jms.internal.JmsServiceImpl; -import org.apache.karaf.shell.api.console.CommandLoggingFilter; -import org.apache.karaf.shell.support.RegexCommandLoggingFilter; -import org.apache.karaf.util.tracker.BaseActivator; -import org.apache.karaf.util.tracker.annotation.ProvideService; -import org.apache.karaf.util.tracker.annotation.Services; - -@Services( - provides = @ProvideService(JmsService.class) -) -public class Activator extends BaseActivator { - @Override - protected void doStart() throws Exception { - JmsServiceImpl service = new JmsServiceImpl(); - service.setBundleContext(bundleContext); - register(JmsService.class, service); - - JmsMBeanImpl mbean = new JmsMBeanImpl(); - mbean.setJmsService(service); - registerMBean(mbean, "type=jms"); - - RegexCommandLoggingFilter filter = new RegexCommandLoggingFilter(); - filter.addRegEx("create +.*?--password ([^ ]+)", 2); - filter.addRegEx("create +.*?-p ([^ ]+)", 2); - register(CommandLoggingFilter.class, filter); - - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/OSGI-INF/bundle.info ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/OSGI-INF/bundle.info b/jms/core/src/main/resources/OSGI-INF/bundle.info deleted file mode 100644 index 9d83749..0000000 --- a/jms/core/src/main/resources/OSGI-INF/bundle.info +++ /dev/null @@ -1,41 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -h1. Synopsis - - ${project.name} - - ${project.description} - - Maven URL: - [mvn:${project.groupId}/${project.artifactId}/${project.version}] - -h1. Description - - This bundle is the core implementation of the JMS service support. - - The JMS service allows you to create connection factories, and send/browse/consume messages. - -h1. Commands - - The bundle contains the following commands: -\${command-list|jms|indent=8,list,cyan} - -h1. See also - - JMS - section of the Karaf User Guide http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml deleted file mode 100644 index da2ad1a..0000000 --- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-activemq.xml +++ /dev/null @@ -1,34 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version - 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 Unless required by - applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the - License. - --> -<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> - - <service interface="javax.jms.ConnectionFactory"> - <service-properties> - <entry key="name" value="${name}" /> - <entry key="osgi.jndi.service.name" value="jms/${name}" /> - <entry key="karaf.jms.wrap" value="true" /> - <entry key="karaf.jms.pool.maxConnections" value="8" /> - </service-properties> - <bean class="org.apache.activemq.ActiveMQConnectionFactory"> - <property name="brokerURL" value="${url}" /> - <property name="userName" value="${username}" /> - <property name="password" value="${password}" /> - </bean> - </service> - -</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml deleted file mode 100644 index 67b1f54..0000000 --- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-artemis.xml +++ /dev/null @@ -1,35 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version - 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 Unless required by - applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the - License. - --> -<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> - - <service interface="javax.jms.ConnectionFactory"> - <service-properties> - <entry key="name" value="${name}" /> - <entry key="osgi.jndi.service.name" value="jms/${name}" /> - <entry key="karaf.jms.wrap" value="true" /> - <entry key="karaf.jms.pool.maxConnections" value="8" /> - </service-properties> - <bean class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"> - <argument value="${url}" /> - <argument value="${username}" /> - <argument value="${password}" /> - <property name="producerWindowSize" value="-1" /> - </bean> - </service> - -</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml ---------------------------------------------------------------------- diff --git a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml b/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml deleted file mode 100644 index 999c85b..0000000 --- a/jms/core/src/main/resources/org/apache/karaf/jms/internal/connectionfactory-webspheremq.xml +++ /dev/null @@ -1,35 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version - 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 Unless required by - applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the - License. - --> -<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"> - - <service interface="javax.jms.ConnectionFactory"> - <service-properties> - <entry key="name" value="${name}"/> - <entry key="osgi.jndi.service.name" value="jms/${name}"/> - <entry key="karaf.jms.wrap" value="true" /> - </service-properties> - <bean class="com.ibm.mq.jms.MQQueueConnectionFactory"> - <property name="transportType" value="1" /> - <property name="hostName" value="${hostname}" /> - <property name="port" value="${port}" /> - <property name="queueManager" value="${queuemanager}" /> - <property name="channel" value="${channel}" /> - </bean> - </service> - -</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pom.xml ---------------------------------------------------------------------- diff --git a/jms/pom.xml b/jms/pom.xml index 3b3b185..dd7155c 100644 --- a/jms/pom.xml +++ b/jms/pom.xml @@ -25,16 +25,109 @@ <groupId>org.apache.karaf</groupId> <artifactId>karaf</artifactId> <version>4.2.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> + <relativePath>../../pom.xml</relativePath> </parent> <groupId>org.apache.karaf.jms</groupId> - <artifactId>parent</artifactId> - <packaging>pom</packaging> - <name>Apache Karaf :: Features</name> - - <modules> - <module>core</module> - <module>pool</module> - </modules> + <artifactId>org.apache.karaf.jms.core</artifactId> + <packaging>bundle</packaging> + <name>Apache Karaf :: JMS :: Core</name> + <description>This bundle provides core implementation of the JMS service.</description> + + <properties> + <appendedResourcesDirectory>${basedir}/../etc/appended-resources</appendedResourcesDirectory> + </properties> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + <version>2.0</version> + </dependency> + <dependency> + <groupId>org.ops4j.pax.jms</groupId> + <artifactId>pax-jms-api</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-pool</artifactId> + <version>5.9.0</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jta_1.0.1B_spec</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.karaf</groupId> + <artifactId>org.apache.karaf.util</artifactId> + </dependency> + <dependency> + <groupId>org.apache.karaf.shell</groupId> + <artifactId>org.apache.karaf.shell.core</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>${project.basedir}/src/main/resources</directory> + <includes> + <include>**/*</include> + </includes> + </resource> + <resource> + <directory>${project.basedir}/src/main/resources</directory> + <filtering>true</filtering> + <includes> + <include>**/*.info</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.apache.karaf.tooling</groupId> + <artifactId>karaf-services-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Export-Package> + org.apache.karaf.jms;-noimport:=true + </Export-Package> + <Import-Package> + javax.jms;version="[1.1,3)", + org.apache.activemq*;resolution:=optional, + * + </Import-Package> + <Private-Package> + org.apache.karaf.jms.command, + org.apache.karaf.jms.command.completers, + org.apache.karaf.jms.internal, + org.apache.karaf.jms.internal.osgi, + org.apache.karaf.util, + org.apache.karaf.util.json + </Private-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/pom.xml ---------------------------------------------------------------------- diff --git a/jms/pool/pom.xml b/jms/pool/pom.xml deleted file mode 100644 index 21278d2..0000000 --- a/jms/pool/pom.xml +++ /dev/null @@ -1,104 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.karaf</groupId> - <artifactId>karaf</artifactId> - <version>4.2.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.karaf.jms</groupId> - <artifactId>org.apache.karaf.jms.pool</artifactId> - <packaging>bundle</packaging> - <name>Apache Karaf :: JMS :: Pool</name> - <description>This bundle provides pooling implementation of the JMS service.</description> - - <properties> - <appendedResourcesDirectory>${basedir}/../../etc/appended-resources</appendedResourcesDirectory> - </properties> - - <dependencies> - <dependency> - <groupId>org.osgi</groupId> - <artifactId>org.osgi.core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-pool2</artifactId> - <version>2.4.2</version> - </dependency> - </dependencies> - - <build> - <resources> - <resource> - <directory>${project.basedir}/src/main/resources</directory> - <includes> - <include>**/*</include> - </includes> - </resource> - <resource> - <directory>${project.basedir}/src/main/resources</directory> - <filtering>true</filtering> - <includes> - <include>**/*.info</include> - </includes> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Export-Package> - </Export-Package> - <Import-Package> - javax.jms;version="[1.1,3)", - * - </Import-Package> - <Private-Package> - org.apache.karaf.jms.pool.internal, - org.apache.karaf.jms.pool.internal.osgi - </Private-Package> - <Bundle-Activator> - org.apache.karaf.jms.pool.internal.osgi.Activator - </Bundle-Activator> - </instructions> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java deleted file mode 100644 index 9dab2fc..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionKey.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -/** - * A cache key for the connection details - * - * - */ -public class ConnectionKey { - private String userName; - private String password; - private int hash; - - public ConnectionKey(String userName, String password) { - this.password = password; - this.userName = userName; - hash = 31; - if (userName != null) { - hash += userName.hashCode(); - } - hash *= 31; - if (password != null) { - hash += password.hashCode(); - } - } - - public int hashCode() { - return hash; - } - - public boolean equals(Object that) { - if (this == that) { - return true; - } - if (that instanceof ConnectionKey) { - return equals((ConnectionKey)that); - } - return false; - } - - public boolean equals(ConnectionKey that) { - return isEqual(this.userName, that.userName) && isEqual(this.password, that.password); - } - - public String getPassword() { - return password; - } - - public String getUserName() { - return userName; - } - - public static boolean isEqual(Object o1, Object o2) { - if (o1 == o2) { - return true; - } - return o1 != null && o2 != null && o1.equals(o2); - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java deleted file mode 100644 index fbf0384..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/ConnectionPool.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import org.apache.commons.pool2.KeyedPooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.commons.pool2.impl.GenericKeyedObjectPool; - -import javax.jms.Connection; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Session; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Holds a real JMS connection along with the session pools associated with it. - * <p/> - * Instances of this class are shared amongst one or more PooledConnection object and must - * track the session objects that are loaned out for cleanup on close as well as ensuring - * that the temporary destinations of the managed Connection are purged when all references - * to this ConnectionPool are released. - */ -public class ConnectionPool { - protected Connection connection; - private int referenceCount; - private long lastUsed = System.currentTimeMillis(); - private final long firstUsed = lastUsed; - private boolean hasExpired; - private int idleTimeout = 30 * 1000; - private long expiryTimeout = 0l; - private boolean useAnonymousProducers = true; - - private final AtomicBoolean started = new AtomicBoolean(false); - private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool; - private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<>(); - - public ConnectionPool(Connection connection) { - - this.connection = connection; - - // Create our internal Pool of session instances. - this.sessionPool = new GenericKeyedObjectPool<>( - new KeyedPooledObjectFactory<SessionKey, PooledSession>() { - - @Override - public void activateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { - ConnectionPool.this.loanedSessions.add(session.getObject()); - } - - @Override - public void destroyObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session.getObject()); - session.getObject().getInternalSession().close(); - } - - @Override - public PooledObject<PooledSession> makeObject(SessionKey key) throws Exception { - Session session = makeSession(key); - return new DefaultPooledObject<>(new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers)); - } - - @Override - public void passivateObject(SessionKey key, PooledObject<PooledSession> session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session.getObject()); - } - - @Override - public boolean validateObject(SessionKey key, PooledObject<PooledSession> session) { - return true; - } - } - ); - } - - // useful when external failure needs to force expiry - public void setHasExpired(boolean val) { - hasExpired = val; - } - - protected Session makeSession(SessionKey key) throws JMSException { - return connection.createSession(key.isTransacted(), key.getAckMode()); - } - - public void start() throws JMSException { - if (started.compareAndSet(false, true)) { - try { - connection.start(); - } catch (JMSException e) { - started.set(false); - throw(e); - } - } - } - - public synchronized Connection getConnection() { - return connection; - } - - public Session createSession(boolean transacted, int ackMode) throws JMSException { - SessionKey key = new SessionKey(transacted, ackMode); - PooledSession session; - try { - session = sessionPool.borrowObject(key); - } catch (Exception e) { - IllegalStateException illegalStateException = new IllegalStateException(e.toString()); - illegalStateException.initCause(e); - throw illegalStateException; - } - return session; - } - - public synchronized void close() { - if (connection != null) { - try { - sessionPool.close(); - } catch (Exception e) { - } finally { - try { - connection.close(); - } catch (Exception e) { - } finally { - connection = null; - } - } - } - } - - public synchronized void incrementReferenceCount() { - referenceCount++; - lastUsed = System.currentTimeMillis(); - } - - public synchronized void decrementReferenceCount() { - referenceCount--; - lastUsed = System.currentTimeMillis(); - if (referenceCount == 0) { - // Loaned sessions are those that are active in the sessionPool and - // have not been closed by the client before closing the connection. - // These need to be closed so that all session's reflect the fact - // that the parent Connection is closed. - for (PooledSession session : this.loanedSessions) { - try { - session.close(); - } catch (Exception e) { - } - } - this.loanedSessions.clear(); - - expiredCheck(); - } - } - - /** - * Determines if this Connection has expired. - * <p/> - * A ConnectionPool is considered expired when all references to it are released AND either - * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed. - * Once a ConnectionPool is determined to have expired its underlying Connection is closed. - * - * @return true if this connection has expired. - */ - public synchronized boolean expiredCheck() { - - boolean expired = false; - - if (connection == null) { - return true; - } - - if (hasExpired) { - if (referenceCount == 0) { - close(); - expired = true; - } - } - - if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { - hasExpired = true; - if (referenceCount == 0) { - close(); - expired = true; - } - } - - // Only set hasExpired here is no references, as a Connection with references is by - // definition not idle at this time. - if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) { - hasExpired = true; - close(); - expired = true; - } - - return expired; - } - - public int getIdleTimeout() { - return idleTimeout; - } - - public void setIdleTimeout(int idleTimeout) { - this.idleTimeout = idleTimeout; - } - - public void setExpiryTimeout(long expiryTimeout) { - this.expiryTimeout = expiryTimeout; - } - - public long getExpiryTimeout() { - return expiryTimeout; - } - - public int getMaximumActiveSessionPerConnection() { - return this.sessionPool.getMaxTotalPerKey(); - } - - public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { - this.sessionPool.setMaxTotalPerKey(maximumActiveSessionPerConnection); - } - - public boolean isUseAnonymousProducers() { - return this.useAnonymousProducers; - } - - public void setUseAnonymousProducers(boolean value) { - this.useAnonymousProducers = value; - } - - /** - * @return the total number of Pooled session including idle sessions that are not - * currently loaned out to any client. - */ - public int getNumSessions() { - return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive(); - } - - /** - * @return the total number of Sessions that are in the Session pool but not loaned out. - */ - public int getNumIdleSessions() { - return this.sessionPool.getNumIdle(); - } - - /** - * @return the total number of Session's that have been loaned to PooledConnection instances. - */ - public int getNumActiveSessions() { - return this.sessionPool.getNumActive(); - } - - /** - * Configure whether the createSession method should block when there are no more idle sessions and the - * pool already contains the maximum number of active sessions. If false the create method will fail - * and throw an exception. - * - * @param block - * Indicates whether blocking should be used to wait for more space to create a session. - */ - public void setBlockIfSessionPoolIsFull(boolean block) { - this.sessionPool.setBlockWhenExhausted(block); - } - - public boolean isBlockIfSessionPoolIsFull() { - return this.sessionPool.getBlockWhenExhausted(); - } - - /** - * Returns the timeout to use for blocking creating new sessions - * - * @return true if the pooled Connection createSession method will block when the limit is hit. - * @see #setBlockIfSessionPoolIsFull(boolean) - */ - public long getBlockIfSessionPoolIsFullTimeout() { - return this.sessionPool.getMaxWaitMillis(); - } - - /** - * Controls the behavior of the internal session pool. By default the call to - * Connection.getSession() will block if the session pool is full. This setting - * will affect how long it blocks and throws an exception after the timeout. - * - * The size of the session pool is controlled by the @see #maximumActive - * property. - * - * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull - * property - * - * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, - * then use this setting to configure how long to block before retry - */ - public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { - this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout); - } - - @Override - public String toString() { - return "ConnectionPool[" + connection + "]"; - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java deleted file mode 100755 index c5900d1..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/IntrospectionSupport.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLServerSocket; -import java.lang.reflect.Method; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -public final class IntrospectionSupport { - - private static final Logger LOG = LoggerFactory.getLogger(IntrospectionSupport.class); - - private IntrospectionSupport() { - } - - @SuppressWarnings("rawtypes") - public static boolean setProperties(Object target, Map props) { - boolean rc = false; - - if (target == null) { - throw new IllegalArgumentException("target was null."); - } - if (props == null) { - throw new IllegalArgumentException("props was null."); - } - - for (Iterator<?> iter = props.entrySet().iterator(); iter.hasNext();) { - Entry<?,?> entry = (Entry<?,?>)iter.next(); - if (setProperty(target, (String)entry.getKey(), entry.getValue())) { - iter.remove(); - rc = true; - } - } - - return rc; - } - - public static boolean setProperty(Object target, String name, Object value) { - try { - Class<?> clazz = target.getClass(); - if (target instanceof SSLServerSocket) { - // overcome illegal access issues with internal implementation class - clazz = SSLServerSocket.class; - } - Method setter = findSetterMethod(clazz, name); - if (setter == null) { - return false; - } - - // If the type is null or it matches the needed type, just use the - // value directly - if (value == null || value.getClass() == setter.getParameterTypes()[0]) { - setter.invoke(target, value); - } else { - // We need to convert it - setter.invoke(target, convert(value, setter.getParameterTypes()[0])); - } - return true; - } catch (Exception e) { - LOG.error(String.format("Could not set property %s on %s", name, target), e); - return false; - } - } - - @SuppressWarnings({ - "rawtypes", "unchecked" - }) - private static Object convert(Object value, Class to) { - if (value == null) { - // lets avoid NullPointerException when converting to boolean for null values - if (boolean.class.isAssignableFrom(to)) { - return Boolean.FALSE; - } - return null; - } - - // eager same instance type test to avoid the overhead of invoking the type converter - // if already same type - if (to.isAssignableFrom(value.getClass())) { - return to.cast(value); - } - - if (boolean.class.isAssignableFrom(to) && value instanceof String) { - return Boolean.valueOf((String) value); - } - - throw new IllegalArgumentException("Cannot convert from " + value.getClass() - + " to " + to + " with value " + value); - } - - private static Method findSetterMethod(Class<?> clazz, String name) { - // Build the method name. - name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1); - Method[] methods = clazz.getMethods(); - for (Method method : methods) { - Class<?> params[] = method.getParameterTypes(); - if (method.getName().equals(name) && params.length == 1 ) { - return method; - } - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java deleted file mode 100755 index 38ccf6a..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnection.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and - * {@link QueueConnection} which is pooled and on {@link #close()} will return - * itself to the sessionPool. - * - * <b>NOTE</b> this implementation is only intended for use when sending - * messages. It does not deal with pooling of consumers; for that look at a - * library like <a href="http://jencks.org/">Jencks</a> such as in <a - * href="http://jencks.org/Message+Driven+POJOs">this example</a> - * - */ -public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener { - private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class); - - protected ConnectionPool pool; - private volatile boolean stopped; - private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<>(); - private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<>(); - private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<>(); - - /** - * Creates a new PooledConnection instance that uses the given ConnectionPool to create - * and manage its resources. The ConnectionPool instance can be shared amongst many - * PooledConnection instances. - * - * @param pool - * The connection and pool manager backing this proxy connection object. - */ - public PooledConnection(ConnectionPool pool) { - this.pool = pool; - } - - /** - * Factory method to create a new instance. - */ - public PooledConnection newInstance() { - return new PooledConnection(pool); - } - - @Override - public void close() throws JMSException { - this.cleanupConnectionTemporaryDestinations(); - this.cleanupAllLoanedSessions(); - if (this.pool != null) { - this.pool.decrementReferenceCount(); - this.pool = null; - } - } - - @Override - public void start() throws JMSException { - assertNotClosed(); - pool.start(); - } - - @Override - public void stop() throws JMSException { - stopped = true; - } - - @Override - public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { - return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages); - } - - @Override - public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { - return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages); - } - - @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException { - return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i); - } - - @Override - public String getClientID() throws JMSException { - return getConnection().getClientID(); - } - - @Override - public ExceptionListener getExceptionListener() throws JMSException { - return getConnection().getExceptionListener(); - } - - @Override - public ConnectionMetaData getMetaData() throws JMSException { - return getConnection().getMetaData(); - } - - @Override - public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { - getConnection().setExceptionListener(exceptionListener); - } - - @Override - public void setClientID(String clientID) throws JMSException { - // ignore repeated calls to setClientID() with the same client id - // this could happen when a JMS component such as Spring that uses a - // PooledConnectionFactory shuts down and reinitializes. - if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) { - getConnection().setClientID(clientID); - } - } - - @Override - public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { - return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages); - } - - // Session factory methods - // ------------------------------------------------------------------------- - @Override - public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException { - return (QueueSession) createSession(transacted, ackMode); - } - - @Override - public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException { - return (TopicSession) createSession(transacted, ackMode); - } - - @Override - public Session createSession(boolean transacted, int ackMode) throws JMSException { - PooledSession result; - result = (PooledSession) pool.createSession(transacted, ackMode); - - // Store the session so we can close the sessions that this PooledConnection - // created in order to ensure that consumers etc are closed per the JMS contract. - loanedSessions.add(result); - - // Add a event listener to the session that notifies us when the session - // creates / destroys temporary destinations and closes etc. - result.addSessionEventListener(this); - return result; - } - - // Implementation methods - // ------------------------------------------------------------------------- - - @Override - public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { - connTempQueues.add(tempQueue); - } - - @Override - public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { - connTempTopics.add(tempTopic); - } - - @Override - public void onSessionClosed(PooledSession session) { - if (session != null) { - this.loanedSessions.remove(session); - } - } - - public Connection getConnection() throws JMSException { - assertNotClosed(); - return pool.getConnection(); - } - - protected void assertNotClosed() throws javax.jms.IllegalStateException { - if (stopped || pool == null) { - throw new javax.jms.IllegalStateException("Connection closed"); - } - } - - protected Session createSession(SessionKey key) throws JMSException { - return getConnection().createSession(key.isTransacted(), key.getAckMode()); - } - - @Override - public String toString() { - return "PooledConnection { " + pool + " }"; - } - - /** - * Remove all of the temporary destinations created for this connection. - * This is important since the underlying connection may be reused over a - * long period of time, accumulating all of the temporary destinations from - * each use. However, from the perspective of the lifecycle from the - * client's view, close() closes the connection and, therefore, deletes all - * of the temporary destinations created. - */ - protected void cleanupConnectionTemporaryDestinations() { - - for (TemporaryQueue tempQueue : connTempQueues) { - try { - tempQueue.delete(); - } catch (JMSException ex) { - LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage()); - } - } - connTempQueues.clear(); - - for (TemporaryTopic tempTopic : connTempTopics) { - try { - tempTopic.delete(); - } catch (JMSException ex) { - LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage()); - } - } - connTempTopics.clear(); - } - - /** - * The PooledSession tracks all Sessions that it created and now we close them. Closing the - * PooledSession will return the internal Session to the Pool of Session after cleaning up - * all the resources that the Session had allocated for this PooledConnection. - */ - protected void cleanupAllLoanedSessions() { - - for (PooledSession session : loanedSessions) { - try { - session.close(); - } catch (JMSException ex) { - LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage()); - } - } - loanedSessions.clear(); - } - - /** - * @return the total number of Pooled session including idle sessions that are not - * currently loaned out to any client. - */ - public int getNumSessions() { - return this.pool.getNumSessions(); - } - - /** - * @return the number of Sessions that are currently checked out of this Connection's session pool. - */ - public int getNumActiveSessions() { - return this.pool.getNumActiveSessions(); - } - - /** - * @return the number of Sessions that are idle in this Connection's sessions pool. - */ - public int getNumtIdleSessions() { - return this.pool.getNumIdleSessions(); - } -}
