http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java deleted file mode 100644 index ba36c1c..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledConnectionFactory.java +++ /dev/null @@ -1,538 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import org.apache.commons.pool2.KeyedPooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.commons.pool2.impl.GenericKeyedObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * A JMS provider which pools Connection, Session and MessageProducer instances - * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's - * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>. - * Connections, sessions and producers are returned to a pool after use so that they can be reused later - * without having to undergo the cost of creating them again. - * - * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers, - * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which - * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually - * just created at startup and left active, handling incoming messages as they come. When a consumer is - * complete, it is best to close it rather than return it to a pool for later reuse: this is because, - * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, - * where they'll get held until the consumer is active again. - * - * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you - * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that - * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: - * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html - * - * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the - * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should - * be used when configuring this optional feature. Eviction runs contend with client threads for access - * to objects in the pool, so if they run too frequently performance issues may result. The idle object - * eviction thread may be configured using the {@link PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By - * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to - * configure the idle eviction thread to run. - */ -public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { - - private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); - - private final AtomicBoolean stopped = new AtomicBoolean(false); - private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool; - - private final ConnectionFactory connectionFactory; - - private int maximumActiveSessionPerConnection = 500; - private int idleTimeout = 30 * 1000; - private boolean blockIfSessionPoolIsFull = true; - private long blockIfSessionPoolIsFullTimeout = -1L; - private long expiryTimeout = 0L; - private boolean createConnectionOnStartup = true; - private boolean useAnonymousProducers = true; - - public PooledConnectionFactory(ConnectionFactory connectionFactory) { - this.connectionFactory = Objects.requireNonNull(connectionFactory, "connectionFactory must be set"); - } - - public void initConnectionsPool() { - if (this.connectionsPool == null) { - this.connectionsPool = new GenericKeyedObjectPool<>( - new KeyedPooledObjectFactory<ConnectionKey, ConnectionPool>() { - @Override - public void activateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception { - } - @Override - public void destroyObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Destroying connection: {}", connection); - } - connection.getObject().close(); - } catch (Exception e) { - LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e); - } - } - @Override - public PooledObject<ConnectionPool> makeObject(ConnectionKey key) throws Exception { - Connection delegate = createConnection(key); - - ConnectionPool connection = createConnectionPool(delegate); - connection.setIdleTimeout(getIdleTimeout()); - connection.setExpiryTimeout(getExpiryTimeout()); - connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection()); - connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull()); - if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) { - connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout()); - } - connection.setUseAnonymousProducers(isUseAnonymousProducers()); - - if (LOG.isTraceEnabled()) { - LOG.trace("Created new connection: {}", connection); - } - - return new DefaultPooledObject<>(connection); - } - - @Override - public void passivateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) throws Exception { - } - - @Override - public boolean validateObject(ConnectionKey key, PooledObject<ConnectionPool> connection) { - if (connection != null && connection.getObject() != null && connection.getObject().expiredCheck()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Connection has expired: {} and will be destroyed", connection); - } - - return false; - } - - return true; - } - }); - - // Set max idle (not max active) since our connections always idle in the pool. - this.connectionsPool.setMaxIdlePerKey(1); - - // We always want our validate method to control when idle objects are evicted. - this.connectionsPool.setTestOnBorrow(true); - this.connectionsPool.setTestWhileIdle(true); - } - } - - /** - * @return the currently configured ConnectionFactory used to create the pooled Connections. - */ - public ConnectionFactory getConnectionFactory() { - return connectionFactory; - } - @Override - public Connection createConnection() throws JMSException { - return createConnection(null, null); - } - - @Override - public synchronized Connection createConnection(String userName, String password) throws JMSException { - if (stopped.get()) { - LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); - return null; - } - - ConnectionPool connection = null; - ConnectionKey key = new ConnectionKey(userName, password); - - // This will either return an existing non-expired ConnectionPool or it - // will create a new one to meet the demand. - if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) { - try { - // we want borrowObject to return the one we added. - connectionsPool.setLifo(true); - connectionsPool.addObject(key); - } catch (Exception e) { - throw createJmsException("Error while attempting to add new Connection to the pool", e); - } - } else { - // now we want the oldest one in the pool. - connectionsPool.setLifo(false); - } - - try { - - // We can race against other threads returning the connection when there is an - // expiration or idle timeout. We keep pulling out ConnectionPool instances until - // we win and get a non-closed instance and then increment the reference count - // under lock to prevent another thread from triggering an expiration check and - // pulling the rug out from under us. - while (true) { - ConnectionPool cp = connectionsPool.borrowObject(key); - synchronized (cp) { - if (cp.getConnection() != null) { - cp.incrementReferenceCount(); - connection = cp; - break; - } - - // Return the bad one to the pool and let if get destroyed as normal. - connectionsPool.returnObject(key, cp); - } - } - } catch (Exception e) { - throw createJmsException("Error while attempting to retrieve a connection from the pool", e); - } - - try { - connectionsPool.returnObject(key, connection); - } catch (Exception e) { - throw createJmsException("Error when returning connection to the pool", e); - } - - return newPooledConnection(connection); - } - - protected Connection newPooledConnection(ConnectionPool connection) { - return new PooledConnection(connection); - } - - private JMSException createJmsException(String msg, Exception cause) { - JMSException exception = new JMSException(msg); - exception.setLinkedException(cause); - exception.initCause(cause); - return exception; - } - - protected Connection createConnection(ConnectionKey key) throws JMSException { - if (key.getUserName() == null && key.getPassword() == null) { - return connectionFactory.createConnection(); - } else { - return connectionFactory.createConnection(key.getUserName(), key.getPassword()); - } - } - - public void start() { - LOG.debug("Starting the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup()); - stopped.set(false); - if (isCreateConnectionOnStartup()) { - try { - // warm the pool by creating a connection during startup - createConnection(); - } catch (JMSException e) { - LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e); - } - } - } - - public void stop() { - if (stopped.compareAndSet(false, true)) { - LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", - connectionsPool != null ? connectionsPool.getNumActive() : 0); - try { - if (connectionsPool != null) { - connectionsPool.close(); - } - } catch (Exception e) { - } - } - } - - /** - * Clears all connections from the pool. Each connection that is currently in the pool is - * closed and removed from the pool. A new connection will be created on the next call to - * {@link #createConnection()}. Care should be taken when using this method as Connections that - * are in use be client's will be closed. - */ - public void clear() { - - if (stopped.get()) { - return; - } - - getConnectionsPool().clear(); - } - - /** - * Returns the currently configured maximum number of sessions a pooled Connection will - * create before it either blocks or throws an exception when a new session is requested, - * depending on configuration. - * - * @return the number of session instances that can be taken from a pooled connection. - */ - public int getMaximumActiveSessionPerConnection() { - return maximumActiveSessionPerConnection; - } - - /** - * Sets the maximum number of active sessions per connection - * - * @param maximumActiveSessionPerConnection - * The maximum number of active session per connection in the pool. - */ - public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { - this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection; - } - - /** - * Controls the behavior of the internal session pool. By default the call to - * Connection.getSession() will block if the session pool is full. If the - * argument false is given, it will change the default behavior and instead the - * call to getSession() will throw a JMSException. - * - * The size of the session pool is controlled by the @see #maximumActive - * property. - * - * @param block - if true, the call to getSession() blocks if the pool is full - * until a session object is available. defaults to true. - */ - public void setBlockIfSessionPoolIsFull(boolean block) { - this.blockIfSessionPoolIsFull = block; - } - - /** - * Returns whether a pooled Connection will enter a blocked state or will throw an Exception - * once the maximum number of sessions has been borrowed from the the Session Pool. - * - * @return true if the pooled Connection createSession method will block when the limit is hit. - * @see #setBlockIfSessionPoolIsFull(boolean) - */ - public boolean isBlockIfSessionPoolIsFull() { - return this.blockIfSessionPoolIsFull; - } - - /** - * Returns the maximum number to pooled Connections that this factory will allow before it - * begins to return connections from the pool on calls to ({@link #createConnection()}. - * - * @return the maxConnections that will be created for this pool. - */ - public int getMaxConnections() { - return getConnectionsPool().getMaxIdlePerKey(); - } - - /** - * Sets the maximum number of pooled Connections (defaults to one). Each call to - * {@link #createConnection()} will result in a new Connection being create up to the max - * connections value. - * - * @param maxConnections the maxConnections to set - */ - public void setMaxConnections(int maxConnections) { - getConnectionsPool().setMaxIdlePerKey(maxConnections); - } - - /** - * Gets the Idle timeout value applied to new Connection's that are created by this pool. - * <p/> - * The idle timeout is used determine if a Connection instance has sat to long in the pool unused - * and if so is closed and removed from the pool. The default value is 30 seconds. - * - * @return idle timeout value (milliseconds) - */ - public int getIdleTimeout() { - return idleTimeout; - } - - /** - * Sets the idle timeout value for Connection's that are created by this pool in Milliseconds, - * defaults to 30 seconds. - * <p/> - * For a Connection that is in the pool but has no current users the idle timeout determines how - * long the Connection can live before it is eligible for removal from the pool. Normally the - * connections are tested when an attempt to check one out occurs so a Connection instance can sit - * in the pool much longer than its idle timeout if connections are used infrequently. - * - * @param idleTimeout - * The maximum time a pooled Connection can sit unused before it is eligible for removal. - */ - public void setIdleTimeout(int idleTimeout) { - this.idleTimeout = idleTimeout; - } - - /** - * allow connections to expire, irrespective of load or idle time. This is useful with failover - * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery - * - * @param expiryTimeout non zero in milliseconds - */ - public void setExpiryTimeout(long expiryTimeout) { - this.expiryTimeout = expiryTimeout; - } - - /** - * @return the configured expiration timeout for connections in the pool. - */ - public long getExpiryTimeout() { - return expiryTimeout; - } - - /** - * @return true if a Connection is created immediately on a call to {@link #start()}. - */ - public boolean isCreateConnectionOnStartup() { - return createConnectionOnStartup; - } - - /** - * Whether to create a connection on starting this {@link PooledConnectionFactory}. - * <p/> - * This can be used to warm-up the pool on startup. Notice that any kind of exception - * happens during startup is logged at WARN level and ignored. - * - * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup - */ - public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) { - this.createConnectionOnStartup = createConnectionOnStartup; - } - - /** - * Should Sessions use one anonymous producer for all producer requests or should a new - * MessageProducer be created for each request to create a producer object, default is true. - * - * When enabled the session only needs to allocate one MessageProducer for all requests and - * the MessageProducer#send(destination, message) method can be used. Normally this is the - * right thing to do however it does result in the Broker not showing the producers per - * destination. - * - * @return true if a PooledSession will use only a single anonymous message producer instance. - */ - public boolean isUseAnonymousProducers() { - return this.useAnonymousProducers; - } - - /** - * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates - * a new MessageProducer for each call the create a MessageProducer. - * - * @param value - * Boolean value that configures whether anonymous producers are used. - */ - public void setUseAnonymousProducers(boolean value) { - this.useAnonymousProducers = value; - } - - /** - * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys. - * - * @return this factories pool of ConnectionPool instances. - */ - protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() { - initConnectionsPool(); - return this.connectionsPool; - } - - /** - * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread. - * When non-positive, no idle object eviction thread will be run, and Connections will only be - * checked on borrow to determine if they have sat idle for too long or have failed for some - * other reason. - * <p/> - * By default this value is set to -1 and no expiration thread ever runs. - * - * @param timeBetweenExpirationCheckMillis - * The time to wait between runs of the idle Connection eviction thread. - */ - public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) { - getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis); - } - - /** - * @return the number of milliseconds to sleep between runs of the idle connection eviction thread. - */ - public long getTimeBetweenExpirationCheckMillis() { - return getConnectionsPool().getTimeBetweenEvictionRunsMillis(); - } - - /** - * @return the number of Connections currently in the Pool - */ - public int getNumConnections() { - return getConnectionsPool().getNumIdle(); - } - - /** - * Delegate that creates each instance of an ConnectionPool object. Subclasses can override - * this method to customize the type of connection pool returned. - * - * @param connection - * - * @return instance of a new ConnectionPool. - */ - protected ConnectionPool createConnectionPool(Connection connection) { - return new ConnectionPool(connection); - } - - /** - * Returns the timeout to use for blocking creating new sessions - * - * @return true if the pooled Connection createSession method will block when the limit is hit. - * @see #setBlockIfSessionPoolIsFull(boolean) - */ - public long getBlockIfSessionPoolIsFullTimeout() { - return blockIfSessionPoolIsFullTimeout; - } - - /** - * Controls the behavior of the internal session pool. By default the call to - * Connection.getSession() will block if the session pool is full. This setting - * will affect how long it blocks and throws an exception after the timeout. - * - * The size of the session pool is controlled by the @see #maximumActive - * property. - * - * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull - * property - * - * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, - * then use this setting to configure how long to block before retry - */ - public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { - this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout; - } - - @Override - public QueueConnection createQueueConnection() throws JMSException { - return (QueueConnection) createConnection(); - } - - @Override - public QueueConnection createQueueConnection(String userName, String password) throws JMSException { - return (QueueConnection) createConnection(userName, password); - } - - @Override - public TopicConnection createTopicConnection() throws JMSException { - return (TopicConnection) createConnection(); - } - - @Override - public TopicConnection createTopicConnection(String userName, String password) throws JMSException { - return (TopicConnection) createConnection(userName, password); - } - -}
http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java deleted file mode 100644 index 9986513..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledMessageConsumer.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; - -/** - * A {@link MessageConsumer} which was created by {@link PooledSession}. - */ -public class PooledMessageConsumer implements MessageConsumer { - - private final PooledSession session; - private final MessageConsumer delegate; - - /** - * Wraps the message consumer. - * - * @param session the pooled session - * @param delegate the created consumer to wrap - */ - public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) { - this.session = session; - this.delegate = delegate; - } - - public void close() throws JMSException { - // ensure session removes consumer as its closed now - session.onConsumerClose(delegate); - delegate.close(); - } - - public MessageListener getMessageListener() throws JMSException { - return delegate.getMessageListener(); - } - - public String getMessageSelector() throws JMSException { - return delegate.getMessageSelector(); - } - - public Message receive() throws JMSException { - return delegate.receive(); - } - - public Message receive(long timeout) throws JMSException { - return delegate.receive(timeout); - } - - public Message receiveNoWait() throws JMSException { - return delegate.receiveNoWait(); - } - - public void setMessageListener(MessageListener listener) throws JMSException { - delegate.setMessageListener(listener); - } - - public String toString() { - return delegate.toString(); - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java deleted file mode 100644 index 53fdeba..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledProducer.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; - -/** - * A pooled {@link MessageProducer} - */ -public class PooledProducer implements MessageProducer { - - private final MessageProducer messageProducer; - private final Destination destination; - - private int deliveryMode; - private boolean disableMessageID; - private boolean disableMessageTimestamp; - private int priority; - private long timeToLive; - private boolean anonymous = true; - - public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException { - this.messageProducer = messageProducer; - this.destination = destination; - this.anonymous = messageProducer.getDestination() == null; - - this.deliveryMode = messageProducer.getDeliveryMode(); - this.disableMessageID = messageProducer.getDisableMessageID(); - this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp(); - this.priority = messageProducer.getPriority(); - this.timeToLive = messageProducer.getTimeToLive(); - } - - @Override - public void close() throws JMSException { - if (!anonymous) { - this.messageProducer.close(); - } - } - - @Override - public void send(Destination destination, Message message) throws JMSException { - send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); - } - - @Override - public void send(Message message) throws JMSException { - send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive()); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - send(destination, message, deliveryMode, priority, timeToLive); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - - if (destination == null) { - if (messageProducer.getDestination() == null) { - throw new UnsupportedOperationException("A destination must be specified."); - } - throw new InvalidDestinationException("Don't understand null destinations"); - } - - MessageProducer messageProducer = getMessageProducer(); - - // just in case let only one thread send at once - synchronized (messageProducer) { - - if (anonymous && this.destination != null && !this.destination.equals(destination)) { - throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination); - } - - // Producer will do it's own Destination validation so always use the destination - // based send method otherwise we might violate a JMS rule. - messageProducer.send(destination, message, deliveryMode, priority, timeToLive); - } - } - - @Override - public Destination getDestination() { - return destination; - } - - @Override - public int getDeliveryMode() { - return deliveryMode; - } - - @Override - public void setDeliveryMode(int deliveryMode) { - this.deliveryMode = deliveryMode; - } - - @Override - public boolean getDisableMessageID() { - return disableMessageID; - } - - @Override - public void setDisableMessageID(boolean disableMessageID) { - this.disableMessageID = disableMessageID; - } - - @Override - public boolean getDisableMessageTimestamp() { - return disableMessageTimestamp; - } - - @Override - public void setDisableMessageTimestamp(boolean disableMessageTimestamp) { - this.disableMessageTimestamp = disableMessageTimestamp; - } - - @Override - public int getPriority() { - return priority; - } - - @Override - public void setPriority(int priority) { - this.priority = priority; - } - - @Override - public long getTimeToLive() { - return timeToLive; - } - - @Override - public void setTimeToLive(long timeToLive) { - this.timeToLive = timeToLive; - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected MessageProducer getMessageProducer() { - return messageProducer; - } - - protected boolean isAnonymous() { - return anonymous; - } - - @Override - public String toString() { - return "PooledProducer { " + messageProducer + " }"; - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java deleted file mode 100644 index f0acca2..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledQueueSender.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.QueueSender; - -/** - * - */ -public class PooledQueueSender extends PooledProducer implements QueueSender { - - public PooledQueueSender(QueueSender messageProducer, Destination destination) throws JMSException { - super(messageProducer, destination); - } - - public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException { - getQueueSender().send(queue, message, i, i1, l); - } - - public void send(Queue queue, Message message) throws JMSException { - getQueueSender().send(queue, message); - } - - public Queue getQueue() throws JMSException { - return getQueueSender().getQueue(); - } - - - protected QueueSender getQueueSender() { - return (QueueSender) getMessageProducer(); - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java deleted file mode 100644 index 941732e..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSession.java +++ /dev/null @@ -1,497 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import org.apache.commons.pool2.KeyedObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.XASession; -import javax.transaction.xa.XAResource; -import java.io.Serializable; -import java.util.concurrent.CopyOnWriteArrayList; - -public class PooledSession implements Session, TopicSession, QueueSession, XASession { - private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); - - private final SessionKey key; - private final KeyedObjectPool<SessionKey, PooledSession> sessionPool; - private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<>(); - private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<>(); - private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<>(); - - private MessageProducer producer; - private TopicPublisher publisher; - private QueueSender sender; - - private Session session; - private boolean transactional = true; - private boolean ignoreClose; - private boolean isXa; - private boolean useAnonymousProducers = true; - - public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) { - this.key = key; - this.session = session; - this.sessionPool = sessionPool; - this.transactional = transactional; - this.useAnonymousProducers = anonymous; - } - - public void addSessionEventListener(PooledSessionEventListener listener) { - // only add if really needed - if (!sessionEventListeners.contains(listener)) { - this.sessionEventListeners.add(listener); - } - } - - protected boolean isIgnoreClose() { - return ignoreClose; - } - - protected void setIgnoreClose(boolean ignoreClose) { - this.ignoreClose = ignoreClose; - } - - @Override - public void close() throws JMSException { - if (!ignoreClose) { - boolean invalidate = false; - try { - // lets reset the session - getInternalSession().setMessageListener(null); - - // Close any consumers and browsers that may have been created. - for (MessageConsumer consumer : consumers) { - consumer.close(); - } - - for (QueueBrowser browser : browsers) { - browser.close(); - } - - if (transactional && !isXa) { - try { - getInternalSession().rollback(); - } catch (JMSException e) { - invalidate = true; - LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); - } - } - } catch (JMSException ex) { - invalidate = true; - LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex); - } finally { - consumers.clear(); - browsers.clear(); - for (PooledSessionEventListener listener : this.sessionEventListeners) { - listener.onSessionClosed(this); - } - sessionEventListeners.clear(); - } - - if (invalidate) { - // lets close the session and not put the session back into the pool - // instead invalidate it so the pool can create a new one on demand. - if (session != null) { - try { - session.close(); - } catch (JMSException e1) { - LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); - } - session = null; - } - try { - sessionPool.invalidateObject(key, this); - } catch (Exception e) { - LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); - } - } else { - try { - sessionPool.returnObject(key, this); - } catch (Exception e) { - javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); - illegalStateException.initCause(e); - throw illegalStateException; - } - } - } - } - - @Override - public void commit() throws JMSException { - getInternalSession().commit(); - } - - @Override - public BytesMessage createBytesMessage() throws JMSException { - return getInternalSession().createBytesMessage(); - } - - @Override - public MapMessage createMapMessage() throws JMSException { - return getInternalSession().createMapMessage(); - } - - @Override - public Message createMessage() throws JMSException { - return getInternalSession().createMessage(); - } - - @Override - public ObjectMessage createObjectMessage() throws JMSException { - return getInternalSession().createObjectMessage(); - } - - @Override - public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { - return getInternalSession().createObjectMessage(serializable); - } - - @Override - public Queue createQueue(String s) throws JMSException { - return getInternalSession().createQueue(s); - } - - @Override - public StreamMessage createStreamMessage() throws JMSException { - return getInternalSession().createStreamMessage(); - } - - @Override - public TemporaryQueue createTemporaryQueue() throws JMSException { - TemporaryQueue result; - - result = getInternalSession().createTemporaryQueue(); - - // Notify all of the listeners of the created temporary Queue. - for (PooledSessionEventListener listener : this.sessionEventListeners) { - listener.onTemporaryQueueCreate(result); - } - - return result; - } - - @Override - public TemporaryTopic createTemporaryTopic() throws JMSException { - TemporaryTopic result; - - result = getInternalSession().createTemporaryTopic(); - - // Notify all of the listeners of the created temporary Topic. - for (PooledSessionEventListener listener : this.sessionEventListeners) { - listener.onTemporaryTopicCreate(result); - } - - return result; - } - - @Override - public void unsubscribe(String s) throws JMSException { - getInternalSession().unsubscribe(s); - } - - @Override - public TextMessage createTextMessage() throws JMSException { - return getInternalSession().createTextMessage(); - } - - @Override - public TextMessage createTextMessage(String s) throws JMSException { - return getInternalSession().createTextMessage(s); - } - - @Override - public Topic createTopic(String s) throws JMSException { - return getInternalSession().createTopic(s); - } - - @Override - public int getAcknowledgeMode() throws JMSException { - return getInternalSession().getAcknowledgeMode(); - } - - @Override - public boolean getTransacted() throws JMSException { - return getInternalSession().getTransacted(); - } - - @Override - public void recover() throws JMSException { - getInternalSession().recover(); - } - - @Override - public void rollback() throws JMSException { - getInternalSession().rollback(); - } - - @Override - public XAResource getXAResource() { - if (session instanceof XASession) { - return ((XASession) session).getXAResource(); - } - return null; - } - - @Override - public Session getSession() { - return this; - } - - @Override - public void run() { - if (session != null) { - session.run(); - } - } - - // Consumer related methods - // ------------------------------------------------------------------------- - @Override - public QueueBrowser createBrowser(Queue queue) throws JMSException { - return addQueueBrowser(getInternalSession().createBrowser(queue)); - } - - @Override - public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { - return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); - } - - @Override - public MessageConsumer createConsumer(Destination destination) throws JMSException { - return addConsumer(getInternalSession().createConsumer(destination)); - } - - @Override - public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { - return addConsumer(getInternalSession().createConsumer(destination, selector)); - } - - @Override - public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { - return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { - return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { - return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); - } - - @Override - public MessageListener getMessageListener() throws JMSException { - return getInternalSession().getMessageListener(); - } - - @Override - public void setMessageListener(MessageListener messageListener) throws JMSException { - getInternalSession().setMessageListener(messageListener); - } - - @Override - public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic)); - } - - @Override - public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { - return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local)); - } - - @Override - public QueueReceiver createReceiver(Queue queue) throws JMSException { - return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue)); - } - - @Override - public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { - return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector)); - } - - // Producer related methods - // ------------------------------------------------------------------------- - @Override - public MessageProducer createProducer(Destination destination) throws JMSException { - return new PooledProducer(getMessageProducer(destination), destination); - } - - @Override - public QueueSender createSender(Queue queue) throws JMSException { - return new PooledQueueSender(getQueueSender(queue), queue); - } - - @Override - public TopicPublisher createPublisher(Topic topic) throws JMSException { - return new PooledTopicPublisher(getTopicPublisher(topic), topic); - } - - public Session getInternalSession() throws IllegalStateException { - if (session == null) { - throw new IllegalStateException("The session has already been closed"); - } - return session; - } - - public MessageProducer getMessageProducer() throws JMSException { - return getMessageProducer(null); - } - - public MessageProducer getMessageProducer(Destination destination) throws JMSException { - MessageProducer result = null; - - if (useAnonymousProducers) { - if (producer == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (producer == null) { - producer = getInternalSession().createProducer(null); - } - } - } - - result = producer; - } else { - result = getInternalSession().createProducer(destination); - } - - return result; - } - - public QueueSender getQueueSender() throws JMSException { - return getQueueSender(null); - } - - public QueueSender getQueueSender(Queue destination) throws JMSException { - QueueSender result = null; - - if (useAnonymousProducers) { - if (sender == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (sender == null) { - sender = ((QueueSession) getInternalSession()).createSender(null); - } - } - } - - result = sender; - } else { - result = ((QueueSession) getInternalSession()).createSender(destination); - } - - return result; - } - - public TopicPublisher getTopicPublisher() throws JMSException { - return getTopicPublisher(null); - } - - public TopicPublisher getTopicPublisher(Topic destination) throws JMSException { - TopicPublisher result = null; - - if (useAnonymousProducers) { - if (publisher == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (publisher == null) { - publisher = ((TopicSession) getInternalSession()).createPublisher(null); - } - } - } - - result = publisher; - } else { - result = ((TopicSession) getInternalSession()).createPublisher(destination); - } - - return result; - } - - private QueueBrowser addQueueBrowser(QueueBrowser browser) { - browsers.add(browser); - return browser; - } - - private MessageConsumer addConsumer(MessageConsumer consumer) { - consumers.add(consumer); - // must wrap in PooledMessageConsumer to ensure the onConsumerClose - // method is invoked when the returned consumer is closed, to avoid memory - // leak in this session class in case many consumers is created - return new PooledMessageConsumer(this, consumer); - } - - private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { - consumers.add(subscriber); - return subscriber; - } - - private QueueReceiver addQueueReceiver(QueueReceiver receiver) { - consumers.add(receiver); - return receiver; - } - - public void setIsXa(boolean isXa) { - this.isXa = isXa; - } - - @Override - public String toString() { - return "PooledSession { " + session + " }"; - } - - /** - * Callback invoked when the consumer is closed. - * <p/> - * This is used to keep track of an explicit closed consumer created by this - * session, by which we know do not need to keep track of the consumer, as - * its already closed. - * - * @param consumer - * the consumer which is being closed - */ - protected void onConsumerClose(MessageConsumer consumer) { - consumers.remove(consumer); - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java deleted file mode 100644 index b9a5dd3..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledSessionEventListener.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; - -interface PooledSessionEventListener { - - /** - * Called on successful creation of a new TemporaryQueue. - * - * @param tempQueue - * The TemporaryQueue just created. - */ - void onTemporaryQueueCreate(TemporaryQueue tempQueue); - - /** - * Called on successful creation of a new TemporaryTopic. - * - * @param tempTopic - * The TemporaryTopic just created. - */ - void onTemporaryTopicCreate(TemporaryTopic tempTopic); - - /** - * Called when the PooledSession is closed. - * - * @param session - * The PooledSession that has been closed. - */ - void onSessionClosed(PooledSession session); - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java deleted file mode 100644 index b466151..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/PooledTopicPublisher.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Topic; -import javax.jms.TopicPublisher; - -/** - * - */ -public class PooledTopicPublisher extends PooledProducer implements TopicPublisher { - - public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException { - super(messageProducer, destination); - } - - public Topic getTopic() throws JMSException { - return getTopicPublisher().getTopic(); - } - - public void publish(Message message) throws JMSException { - getTopicPublisher().publish((Topic) getDestination(), message); - } - - public void publish(Message message, int i, int i1, long l) throws JMSException { - getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l); - } - - public void publish(Topic topic, Message message) throws JMSException { - getTopicPublisher().publish(topic, message); - } - - public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException { - getTopicPublisher().publish(topic, message, i, i1, l); - } - - protected TopicPublisher getTopicPublisher() { - return (TopicPublisher) getMessageProducer(); - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java deleted file mode 100644 index 389438a..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/SessionKey.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal; - -/** - * A cache key for the session details - * - * - */ -public class SessionKey { - - private boolean transacted; - private int ackMode; - - private int hash; - - public SessionKey(boolean transacted, int ackMode) { - this.transacted = transacted; - this.ackMode = ackMode; - hash = ackMode; - if (transacted) { - hash = 31 * hash + 1; - } - } - - public int hashCode() { - return hash; - } - - public boolean equals(Object that) { - if (this == that) { - return true; - } - if (that instanceof SessionKey) { - return equals((SessionKey) that); - } - return false; - } - - public boolean equals(SessionKey that) { - return this.transacted == that.transacted && this.ackMode == that.ackMode; - } - - public boolean isTransacted() { - return transacted; - } - - public int getAckMode() { - return ackMode; - } -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java ---------------------------------------------------------------------- diff --git a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java b/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java deleted file mode 100644 index dfecd1c..0000000 --- a/jms/pool/src/main/java/org/apache/karaf/jms/pool/internal/osgi/Activator.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.karaf.jms.pool.internal.osgi; - -import org.apache.karaf.jms.pool.internal.PooledConnectionFactory; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceReference; -import org.osgi.framework.ServiceRegistration; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.ConnectionFactory; -import java.util.Hashtable; -import java.util.function.Consumer; -import java.util.function.Function; - -public class Activator implements BundleActivator, ServiceTrackerCustomizer<ConnectionFactory, Activator.ConnectionFactoryData> { - - public static final String PROP_PREFIX = "karaf.jms."; - - public static final String PROP_OPT_IN = PROP_PREFIX + "wrap"; - - public static final String PROP_POOL = PROP_PREFIX + "pool."; - - private static final transient Logger LOG = LoggerFactory.getLogger(Activator.class); - - private BundleContext context; - private ServiceTracker<ConnectionFactory, ConnectionFactoryData> cfTracker; - - @Override - public void start(BundleContext context) throws Exception { - this.context = context; - cfTracker = new ServiceTracker<>( - context, - context.createFilter("(&(objectClass=javax.jms.ConnectionFactory)(" + PROP_OPT_IN + "=*))"), - this); - cfTracker.open(); - } - - @Override - public void stop(BundleContext context) throws Exception { - cfTracker.close(); - } - - @Override - public ConnectionFactoryData addingService(ServiceReference<ConnectionFactory> reference) { - ConnectionFactoryData data = new ConnectionFactoryData(context, reference); - try { - data.init(); - return data; - } catch (Throwable t) { - LOG.warn("Error creating pooled JMS ConnectionFactory", t); - data.destroy(); - return null; - } - } - - @Override - public void modifiedService(ServiceReference<ConnectionFactory> reference, ConnectionFactoryData service) { - } - - @Override - public void removedService(ServiceReference<ConnectionFactory> reference, ConnectionFactoryData service) { - service.destroy(); - } - - class ConnectionFactoryData { - - private final BundleContext context; - private final ServiceReference<ConnectionFactory> reference; - private ConnectionFactory connectionFactory; - private PooledConnectionFactory pooledConnectionFactory; - private ServiceRegistration<ConnectionFactory> registration; - - ConnectionFactoryData(BundleContext context, ServiceReference<ConnectionFactory> reference) { - this.context = context; - this.reference = reference; - } - - void init() throws Exception { - connectionFactory = context.getService(reference); - PooledConnectionFactory pcf = new PooledConnectionFactory(connectionFactory); - populate(pcf); - register(pcf); - } - - void destroy() { - unregister(); - if (connectionFactory != null) { - try { - context.ungetService(reference); - } catch (Exception e) { - // Ignore - } finally { - connectionFactory = null; - } - } - } - - void populate(PooledConnectionFactory pcf) { - setObject(PROP_POOL + "maxConnections", Integer::parseInt, pcf::setMaxConnections); - setObject(PROP_POOL + "maximumActiveSessionPerConnection", Integer::parseInt, pcf::setMaximumActiveSessionPerConnection); - setObject(PROP_POOL + "idleTimeout", Integer::parseInt, pcf::setIdleTimeout); - setObject(PROP_POOL + "blockIfSessionPoolIsFull", Boolean::parseBoolean, pcf::setBlockIfSessionPoolIsFull); - setObject(PROP_POOL + "blockIfSessionPoolIsFullTimeout", Long::parseLong, pcf::setBlockIfSessionPoolIsFullTimeout); - setObject(PROP_POOL + "expiryTimeout", Long::parseLong, pcf::setExpiryTimeout); - setObject(PROP_POOL + "createConnectionOnStartup", Boolean::parseBoolean, pcf::setCreateConnectionOnStartup); - setObject(PROP_POOL + "useAnonymousProducers", Boolean::parseBoolean, pcf::setUseAnonymousProducers); - } - - <T> void setObject(String name, Function<String, T> parser, Consumer<T> setter) { - Object o = reference.getProperty(name); - if (o != null) { - setter.accept(parser.apply(o.toString())); - } - } - - void register(PooledConnectionFactory pcf) { - this.pooledConnectionFactory = pcf; - Hashtable<String, Object> props = new Hashtable<>(); - int ranking = 0; - for (String key : reference.getPropertyKeys()) { - Object value = reference.getProperty(key); - if (Constants.SERVICE_RANKING.equals(key)) { - if (value instanceof Integer) { - ranking = (Integer) value; - } - } else if (!key.startsWith("service.") - && !key.startsWith(PROP_PREFIX)) { - props.put(key, value); - } - } - props.put(Constants.SERVICE_RANKING, ranking + 1); - pcf.start(); - BundleContext context = reference.getBundle().getBundleContext(); - registration = context.registerService(ConnectionFactory.class, pooledConnectionFactory, props); - } - - void unregister() { - if (registration != null) { - try { - registration.unregister(); - } catch (Exception e) { - // Ignore - } finally { - registration = null; - } - } - if (pooledConnectionFactory != null) { - try { - pooledConnectionFactory.stop(); - } catch (Exception e) { - // Ignore - } finally { - pooledConnectionFactory = null; - } - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java b/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java new file mode 100644 index 0000000..d62a9ba --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/JmsMBean.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms; + +import javax.management.MBeanException; +import javax.management.openmbean.TabularData; +import java.util.List; +import java.util.Map; + +/** + * JMS MBean. + */ +public interface JmsMBean { + + /** + * List the JMS connection factories. + * + * @return The {@link List} of the JMS connection factories name. + * @throws MBeanException If the MBean fails. + */ + List<String> getConnectionfactories() throws MBeanException; + + /** + * Create a JMS connection factory. + * + * @param name The JMS connection factory name. + * @param type The JMS connection factory type (ActiveMQ or WebsphereMQ). + * @param url The JMS connection factory URL. NB: when type is WebsphereMQ, the URL has the format host/port/queuemanager/channel. + * @throws MBeanException If the MBean fails. + */ + void create(String name, String type, String url) throws MBeanException; + + /** + * Create a JMS connection factory. + * + * @param name The JMS connection factory name. + * @param type The JMS connection factory type (ActiveMQ or WebsphereMQ). + * @param url The JMS connection factory URL. NB: when type is WebsphereMQ, the URL has the format host/port/queuemanager/channel. + * @param username The JMS connection factory authentication username. + * @param password The JMS connection factory authentication password. + * @throws MBeanException If the MBean fails. + */ + void create(String name, String type, String url, String username, String password) throws MBeanException; + + /** + * Delete a JMS connection factory. + * + * @param name The JMS connection factory name. + * @throws MBeanException If the MBean fails. + */ + void delete(String name) throws MBeanException; + + /** + * Get details about a JMS connection factory. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return A {@link Map} (property/value) containing details. + * @throws MBeanException If the MBean fails. + */ + Map<String, String> info(String connectionFactory, String username, String password) throws MBeanException; + + /** + * Count the messages on a given JMS queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The JMS queue name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages in the queue. + * @throws MBeanException If the MBean fails. + */ + int count(String connectionFactory, String queue, String username, String password) throws MBeanException; + + /** + * List the JMS queues. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The {@link List} of JMS queues. + * @throws MBeanException If the MBean fails. + */ + List<String> queues(String connectionFactory, String username, String password) throws MBeanException; + + /** + * List the JMS topics. + * + * @param connectionFactory The JMS connection factory name. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The @link List} of JMS topics. + * @throws MBeanException If the MBean fails. + */ + List<String> topics(String connectionFactory, String username, String password) throws MBeanException; + + /** + * Browse the messages in a JMS queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The JMS queue name. + * @param selector A selector to use to browse only certain messages. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return A {@link TabularData} containing messages details. + * @throws MBeanException If the MBean fails. + */ + TabularData browse(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException; + + /** + * Send a JMS message to given queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The JMS queue name. + * @param content The message content. + * @param replyTo The message ReplyTo. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @throws MBeanException If the MBean fails. + */ + void send(String connectionFactory, String queue, String content, String replyTo, String username, String password) throws MBeanException; + + /** + * Consume JMS messages from a given queue. + * + * @param connectionFactory The JMS connection factory name. + * @param queue The JMS queue name. + * @param selector A selector to use to consume only certain messages. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages consumed. + * @throws MBeanException If the MBean fails. + */ + int consume(String connectionFactory, String queue, String selector, String username, String password) throws MBeanException; + + /** + * Move JMS messages from one queue to another. + * + * @param connectionFactory The JMS connection factory name. + * @param source The source JMS queue name. + * @param destination The destination JMS queue name. + * @param selector A selector to move only certain messages. + * @param username The (optional) username to connect to the JMS broker. + * @param password The (optional) password to connect to the JMS broker. + * @return The number of messages moved. + * @throws MBeanException If the MBean fails. + */ + int move(String connectionFactory, String source, String destination, String selector, String username, String password) throws MBeanException; + +} http://git-wip-us.apache.org/repos/asf/karaf/blob/7a84233c/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java ---------------------------------------------------------------------- diff --git a/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java b/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java new file mode 100644 index 0000000..acf13bf --- /dev/null +++ b/jms/src/main/java/org/apache/karaf/jms/JmsMessage.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.jms; + +import javax.jms.*; +import java.io.UnsupportedEncodingException; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +/** + * Describe a JMS message is more human readable way. + */ +public class JmsMessage { + + private Map<String, Object> properties = new HashMap<>(); + + private String content; + private String charset = "UTF-8"; + private String correlationID; + private String deliveryMode; + private String destination; + private String expiration; + private String messageId; + private int priority; + private boolean redelivered; + private String replyTo; + private String timestamp; + private String type; + + public JmsMessage(Message message) { + try { + initFromMessage(message); + } catch (JMSException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public void initFromMessage(Message message) throws JMSException { + @SuppressWarnings("unchecked") + Enumeration<String> names = message.getPropertyNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + Object value = message.getObjectProperty(key); + properties.put(key, value); + } + + correlationID = message.getJMSCorrelationID(); + if (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT) { + deliveryMode = "Non Persistent"; + } else { + deliveryMode = "Persistent"; + } + Destination destinationDest = message.getJMSDestination(); + if (destinationDest != null) { + destination = destinationDest.toString(); + } + if (message.getJMSExpiration() > 0) { + expiration = new Date(message.getJMSExpiration()).toString(); + } else { + expiration = "Never"; + } + messageId = message.getJMSMessageID(); + priority = message.getJMSPriority(); + redelivered = message.getJMSRedelivered(); + Destination replyToDest = message.getJMSReplyTo(); + if (replyToDest != null) { + replyTo = replyToDest.toString(); + } + if (message.getJMSTimestamp() > 0) { + timestamp = new Date(message.getJMSTimestamp()).toString(); + } else { + timestamp = ""; + } + type = message.getJMSType(); + content = getMessageContent(message); + } + + + private String getMessageContent(Message message) throws JMSException { + if (message instanceof TextMessage) { + return ((TextMessage) message).getText(); + } else if (message instanceof BytesMessage) { + BytesMessage bMessage = (BytesMessage) message; + long length = bMessage.getBodyLength(); + byte[] content = new byte[(int) length]; + bMessage.readBytes(content); + try { + return new String(content, charset); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + return ""; + } + + public Map<String, Object> getProperties() { + return properties; + } + + public String getContent() { + return content; + } + + public String getCharset() { + return charset; + } + + public String getCorrelationID() { + return correlationID; + } + + public String getDeliveryMode() { + return deliveryMode; + } + + public String getDestination() { + return destination; + } + + public String getExpiration() { + return expiration; + } + + public String getMessageId() { + return messageId; + } + + public int getPriority() { + return priority; + } + + public boolean isRedelivered() { + return redelivered; + } + + public String getReplyTo() { + return replyTo; + } + + public String getTimestamp() { + return timestamp; + } + + public String getType() { + return type; + } + +} \ No newline at end of file
