http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java index 478417d..747b72a 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java @@ -18,154 +18,155 @@ import org.hornetq.api.jms.JMSFactoryType; import org.hornetq.core.journal.EncodingSupport; /** - * A ConnectionFactoryConfiguration for {@link ConnectionFactory} objects. + * A ConnectionFactoryConfiguration for {@link javax.jms.ConnectionFactory} objects. * * @author <a href="mailto:[email protected]">Jeff Mesnil</a> */ public interface ConnectionFactoryConfiguration extends EncodingSupport { - boolean isPersisted(); String getName(); + ConnectionFactoryConfiguration setName(String name); + String[] getBindings(); - void setBindings(String[] bindings); + ConnectionFactoryConfiguration setBindings(String... bindings); String getDiscoveryGroupName(); - void setDiscoveryGroupName(String discoveryGroupName); + ConnectionFactoryConfiguration setDiscoveryGroupName(String discoveryGroupName); List<String> getConnectorNames(); - void setConnectorNames(List<String> connectorNames); + ConnectionFactoryConfiguration setConnectorNames(List<String> connectorNames); boolean isHA(); - void setHA(boolean ha); + ConnectionFactoryConfiguration setHA(boolean ha); String getClientID(); - void setClientID(String clientID); + ConnectionFactoryConfiguration setClientID(String clientID); long getClientFailureCheckPeriod(); - void setClientFailureCheckPeriod(long clientFailureCheckPeriod); + ConnectionFactoryConfiguration setClientFailureCheckPeriod(long clientFailureCheckPeriod); long getConnectionTTL(); - void setConnectionTTL(long connectionTTL); + ConnectionFactoryConfiguration setConnectionTTL(long connectionTTL); long getCallTimeout(); - void setCallTimeout(long callTimeout); + ConnectionFactoryConfiguration setCallTimeout(long callTimeout); long getCallFailoverTimeout(); - void setCallFailoverTimeout(long callFailoverTimeout); + ConnectionFactoryConfiguration setCallFailoverTimeout(long callFailoverTimeout); boolean isCacheLargeMessagesClient(); - void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient); + ConnectionFactoryConfiguration setCacheLargeMessagesClient(boolean cacheLargeMessagesClient); int getMinLargeMessageSize(); - void setMinLargeMessageSize(int minLargeMessageSize); + ConnectionFactoryConfiguration setMinLargeMessageSize(int minLargeMessageSize); boolean isCompressLargeMessages(); - void setCompressLargeMessages(boolean avoidLargeMessages); + ConnectionFactoryConfiguration setCompressLargeMessages(boolean avoidLargeMessages); int getConsumerWindowSize(); - void setConsumerWindowSize(int consumerWindowSize); + ConnectionFactoryConfiguration setConsumerWindowSize(int consumerWindowSize); int getConsumerMaxRate(); - void setConsumerMaxRate(int consumerMaxRate); + ConnectionFactoryConfiguration setConsumerMaxRate(int consumerMaxRate); int getConfirmationWindowSize(); - void setConfirmationWindowSize(int confirmationWindowSize); + ConnectionFactoryConfiguration setConfirmationWindowSize(int confirmationWindowSize); int getProducerWindowSize(); - void setProducerWindowSize(int producerWindowSize); + ConnectionFactoryConfiguration setProducerWindowSize(int producerWindowSize); int getProducerMaxRate(); - void setProducerMaxRate(int producerMaxRate); + ConnectionFactoryConfiguration setProducerMaxRate(int producerMaxRate); boolean isBlockOnAcknowledge(); - void setBlockOnAcknowledge(boolean blockOnAcknowledge); + ConnectionFactoryConfiguration setBlockOnAcknowledge(boolean blockOnAcknowledge); boolean isBlockOnDurableSend(); - void setBlockOnDurableSend(boolean blockOnDurableSend); + ConnectionFactoryConfiguration setBlockOnDurableSend(boolean blockOnDurableSend); boolean isBlockOnNonDurableSend(); - void setBlockOnNonDurableSend(boolean blockOnNonDurableSend); + ConnectionFactoryConfiguration setBlockOnNonDurableSend(boolean blockOnNonDurableSend); boolean isAutoGroup(); - void setAutoGroup(boolean autoGroup); + ConnectionFactoryConfiguration setAutoGroup(boolean autoGroup); boolean isPreAcknowledge(); - void setPreAcknowledge(boolean preAcknowledge); + ConnectionFactoryConfiguration setPreAcknowledge(boolean preAcknowledge); String getLoadBalancingPolicyClassName(); - void setLoadBalancingPolicyClassName(String loadBalancingPolicyClassName); + ConnectionFactoryConfiguration setLoadBalancingPolicyClassName(String loadBalancingPolicyClassName); int getTransactionBatchSize(); - void setTransactionBatchSize(int transactionBatchSize); + ConnectionFactoryConfiguration setTransactionBatchSize(int transactionBatchSize); int getDupsOKBatchSize(); - void setDupsOKBatchSize(int dupsOKBatchSize); + ConnectionFactoryConfiguration setDupsOKBatchSize(int dupsOKBatchSize); boolean isUseGlobalPools(); - void setUseGlobalPools(boolean useGlobalPools); + ConnectionFactoryConfiguration setUseGlobalPools(boolean useGlobalPools); int getScheduledThreadPoolMaxSize(); - void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize); + ConnectionFactoryConfiguration setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize); int getThreadPoolMaxSize(); - void setThreadPoolMaxSize(int threadPoolMaxSize); + ConnectionFactoryConfiguration setThreadPoolMaxSize(int threadPoolMaxSize); long getRetryInterval(); - void setRetryInterval(long retryInterval); + ConnectionFactoryConfiguration setRetryInterval(long retryInterval); double getRetryIntervalMultiplier(); - void setRetryIntervalMultiplier(double retryIntervalMultiplier); + ConnectionFactoryConfiguration setRetryIntervalMultiplier(double retryIntervalMultiplier); long getMaxRetryInterval(); - void setMaxRetryInterval(long maxRetryInterval); + ConnectionFactoryConfiguration setMaxRetryInterval(long maxRetryInterval); int getReconnectAttempts(); - void setReconnectAttempts(int reconnectAttempts); + ConnectionFactoryConfiguration setReconnectAttempts(int reconnectAttempts); boolean isFailoverOnInitialConnection(); - void setFailoverOnInitialConnection(boolean failover); + ConnectionFactoryConfiguration setFailoverOnInitialConnection(boolean failover); String getGroupID(); - void setGroupID(String groupID); + ConnectionFactoryConfiguration setGroupID(String groupID); - void setFactoryType(JMSFactoryType factType); + ConnectionFactoryConfiguration setFactoryType(JMSFactoryType factType); JMSFactoryType getFactoryType(); }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSConfiguration.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSConfiguration.java index 06ba294..a7f332a 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSConfiguration.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSConfiguration.java @@ -25,15 +25,23 @@ import javax.naming.Context; */ public interface JMSConfiguration { - void setContext(Context context); + JMSConfiguration setContext(Context context); Context getContext(); List<JMSQueueConfiguration> getQueueConfigurations(); + JMSConfiguration setQueueConfigurations(List<JMSQueueConfiguration> queueConfigurations); + List<TopicConfiguration> getTopicConfigurations(); + JMSConfiguration setTopicConfigurations(List<TopicConfiguration> topicConfigurations); + List<ConnectionFactoryConfiguration> getConnectionFactoryConfigurations(); + JMSConfiguration setConnectionFactoryConfigurations(List<ConnectionFactoryConfiguration> connectionFactoryConfigurations); + String getDomain(); + + JMSConfiguration setDomain(String domain); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSQueueConfiguration.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSQueueConfiguration.java index 8d4579b..1f810b1 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSQueueConfiguration.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/JMSQueueConfiguration.java @@ -23,9 +23,17 @@ public interface JMSQueueConfiguration { String getName(); + JMSQueueConfiguration setName(String name); + String getSelector(); + JMSQueueConfiguration setSelector(String selector); + boolean isDurable(); + JMSQueueConfiguration setDurable(boolean durable); + String[] getBindings(); + + JMSQueueConfiguration setBindings(String[] bindings); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/TopicConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/TopicConfiguration.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/TopicConfiguration.java index 93ea4de..0675262 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/TopicConfiguration.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/TopicConfiguration.java @@ -21,5 +21,9 @@ public interface TopicConfiguration { String getName(); + TopicConfiguration setName(String name); + String[] getBindings(); + + TopicConfiguration setBindings(String... bindings); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java index 9aef0d7..08fc42f 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java @@ -38,15 +38,15 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf // Attributes ---------------------------------------------------- - private String name; + private String name = null; - private boolean persisted; + private boolean persisted = false; - private String[] bindings; + private String[] bindings = null; - private List<String> connectorNames; + private List<String> connectorNames = null; - private String discoveryGroupName; + private String discoveryGroupName = null; private String clientID = null; @@ -118,30 +118,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf // Constructors -------------------------------------------------- - /** - * To be used on persistence only - */ public ConnectionFactoryConfigurationImpl() { } - public ConnectionFactoryConfigurationImpl(final String name, - final boolean ha, - final List<String> connectorNames, - final String... bindings) - { - this(name, ha, bindings); - this.connectorNames = connectorNames; - } - - public ConnectionFactoryConfigurationImpl(final String name, final boolean ha, final String... bindings) - { - this.name = name; - this.ha = ha; - this.bindings = new String[bindings.length]; - System.arraycopy(bindings, 0, this.bindings, 0, bindings.length); - } - // ConnectionFactoryConfiguration implementation ----------------- public String[] getBindings() @@ -149,9 +129,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return bindings; } - public void setBindings(final String[] bindings) + public ConnectionFactoryConfiguration setBindings(final String... bindings) { this.bindings = bindings; + return this; } public String getName() @@ -159,6 +140,12 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return name; } + public ConnectionFactoryConfiguration setName(String name) + { + this.name = name; + return this; + } + public boolean isPersisted() { return persisted; @@ -175,9 +162,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf /** * @param discoveryGroupName the discoveryGroupName to set */ - public void setDiscoveryGroupName(String discoveryGroupName) + public ConnectionFactoryConfiguration setDiscoveryGroupName(String discoveryGroupName) { this.discoveryGroupName = discoveryGroupName; + return this; } public List<String> getConnectorNames() @@ -185,9 +173,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return connectorNames; } - public void setConnectorNames(final List<String> connectorNames) + public ConnectionFactoryConfiguration setConnectorNames(final List<String> connectorNames) { this.connectorNames = connectorNames; + return this; } public boolean isHA() @@ -195,9 +184,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return ha; } - public void setHA(final boolean ha) + public ConnectionFactoryConfiguration setHA(final boolean ha) { this.ha = ha; + return this; } public String getClientID() @@ -205,9 +195,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return clientID; } - public void setClientID(final String clientID) + public ConnectionFactoryConfiguration setClientID(final String clientID) { this.clientID = clientID; + return this; } public long getClientFailureCheckPeriod() @@ -215,9 +206,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return clientFailureCheckPeriod; } - public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod) + public ConnectionFactoryConfiguration setClientFailureCheckPeriod(final long clientFailureCheckPeriod) { this.clientFailureCheckPeriod = clientFailureCheckPeriod; + return this; } public long getConnectionTTL() @@ -225,9 +217,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return connectionTTL; } - public void setConnectionTTL(final long connectionTTL) + public ConnectionFactoryConfiguration setConnectionTTL(final long connectionTTL) { this.connectionTTL = connectionTTL; + return this; } public long getCallTimeout() @@ -235,9 +228,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return callTimeout; } - public void setCallTimeout(final long callTimeout) + public ConnectionFactoryConfiguration setCallTimeout(final long callTimeout) { this.callTimeout = callTimeout; + return this; } public long getCallFailoverTimeout() @@ -245,9 +239,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return callFailoverTimeout; } - public void setCallFailoverTimeout(long callFailoverTimeout) + public ConnectionFactoryConfiguration setCallFailoverTimeout(long callFailoverTimeout) { this.callFailoverTimeout = callFailoverTimeout; + return this; } public boolean isCacheLargeMessagesClient() @@ -255,9 +250,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return cacheLargeMessagesClient; } - public void setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) + public ConnectionFactoryConfiguration setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) { this.cacheLargeMessagesClient = cacheLargeMessagesClient; + return this; } public int getMinLargeMessageSize() @@ -265,9 +261,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return minLargeMessageSize; } - public void setMinLargeMessageSize(final int minLargeMessageSize) + public ConnectionFactoryConfiguration setMinLargeMessageSize(final int minLargeMessageSize) { this.minLargeMessageSize = minLargeMessageSize; + return this; } public int getConsumerWindowSize() @@ -275,9 +272,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return consumerWindowSize; } - public void setConsumerWindowSize(final int consumerWindowSize) + public ConnectionFactoryConfiguration setConsumerWindowSize(final int consumerWindowSize) { this.consumerWindowSize = consumerWindowSize; + return this; } public int getConsumerMaxRate() @@ -285,9 +283,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return consumerMaxRate; } - public void setConsumerMaxRate(final int consumerMaxRate) + public ConnectionFactoryConfiguration setConsumerMaxRate(final int consumerMaxRate) { this.consumerMaxRate = consumerMaxRate; + return this; } public int getConfirmationWindowSize() @@ -295,9 +294,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return confirmationWindowSize; } - public void setConfirmationWindowSize(final int confirmationWindowSize) + public ConnectionFactoryConfiguration setConfirmationWindowSize(final int confirmationWindowSize) { this.confirmationWindowSize = confirmationWindowSize; + return this; } public int getProducerMaxRate() @@ -305,9 +305,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return producerMaxRate; } - public void setProducerMaxRate(final int producerMaxRate) + public ConnectionFactoryConfiguration setProducerMaxRate(final int producerMaxRate) { this.producerMaxRate = producerMaxRate; + return this; } public int getProducerWindowSize() @@ -315,9 +316,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return producerWindowSize; } - public void setProducerWindowSize(final int producerWindowSize) + public ConnectionFactoryConfiguration setProducerWindowSize(final int producerWindowSize) { this.producerWindowSize = producerWindowSize; + return this; } public boolean isBlockOnAcknowledge() @@ -325,9 +327,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return blockOnAcknowledge; } - public void setBlockOnAcknowledge(final boolean blockOnAcknowledge) + public ConnectionFactoryConfiguration setBlockOnAcknowledge(final boolean blockOnAcknowledge) { this.blockOnAcknowledge = blockOnAcknowledge; + return this; } public boolean isBlockOnDurableSend() @@ -335,9 +338,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return blockOnDurableSend; } - public void setBlockOnDurableSend(final boolean blockOnDurableSend) + public ConnectionFactoryConfiguration setBlockOnDurableSend(final boolean blockOnDurableSend) { this.blockOnDurableSend = blockOnDurableSend; + return this; } public boolean isBlockOnNonDurableSend() @@ -345,9 +349,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return blockOnNonDurableSend; } - public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) + public ConnectionFactoryConfiguration setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) { this.blockOnNonDurableSend = blockOnNonDurableSend; + return this; } public boolean isAutoGroup() @@ -355,9 +360,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return autoGroup; } - public void setAutoGroup(final boolean autoGroup) + public ConnectionFactoryConfiguration setAutoGroup(final boolean autoGroup) { this.autoGroup = autoGroup; + return this; } public boolean isPreAcknowledge() @@ -365,9 +371,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return preAcknowledge; } - public void setPreAcknowledge(final boolean preAcknowledge) + public ConnectionFactoryConfiguration setPreAcknowledge(final boolean preAcknowledge) { this.preAcknowledge = preAcknowledge; + return this; } public String getLoadBalancingPolicyClassName() @@ -375,9 +382,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return loadBalancingPolicyClassName; } - public void setLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) + public ConnectionFactoryConfiguration setLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) { this.loadBalancingPolicyClassName = loadBalancingPolicyClassName; + return this; } public int getTransactionBatchSize() @@ -385,9 +393,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return transactionBatchSize; } - public void setTransactionBatchSize(final int transactionBatchSize) + public ConnectionFactoryConfiguration setTransactionBatchSize(final int transactionBatchSize) { this.transactionBatchSize = transactionBatchSize; + return this; } public int getDupsOKBatchSize() @@ -395,9 +404,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return dupsOKBatchSize; } - public void setDupsOKBatchSize(final int dupsOKBatchSize) + public ConnectionFactoryConfiguration setDupsOKBatchSize(final int dupsOKBatchSize) { this.dupsOKBatchSize = dupsOKBatchSize; + return this; } public long getInitialWaitTimeout() @@ -405,9 +415,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return initialWaitTimeout; } - public void setInitialWaitTimeout(final long initialWaitTimeout) + public ConnectionFactoryConfiguration setInitialWaitTimeout(final long initialWaitTimeout) { this.initialWaitTimeout = initialWaitTimeout; + return this; } public boolean isUseGlobalPools() @@ -415,9 +426,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return useGlobalPools; } - public void setUseGlobalPools(final boolean useGlobalPools) + public ConnectionFactoryConfiguration setUseGlobalPools(final boolean useGlobalPools) { this.useGlobalPools = useGlobalPools; + return this; } public int getScheduledThreadPoolMaxSize() @@ -425,9 +437,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return scheduledThreadPoolMaxSize; } - public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) + public ConnectionFactoryConfiguration setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) { this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; + return this; } public int getThreadPoolMaxSize() @@ -435,9 +448,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return threadPoolMaxSize; } - public void setThreadPoolMaxSize(final int threadPoolMaxSize) + public ConnectionFactoryConfiguration setThreadPoolMaxSize(final int threadPoolMaxSize) { this.threadPoolMaxSize = threadPoolMaxSize; + return this; } public long getRetryInterval() @@ -445,9 +459,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return retryInterval; } - public void setRetryInterval(final long retryInterval) + public ConnectionFactoryConfiguration setRetryInterval(final long retryInterval) { this.retryInterval = retryInterval; + return this; } public double getRetryIntervalMultiplier() @@ -455,9 +470,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return retryIntervalMultiplier; } - public void setRetryIntervalMultiplier(final double retryIntervalMultiplier) + public ConnectionFactoryConfiguration setRetryIntervalMultiplier(final double retryIntervalMultiplier) { this.retryIntervalMultiplier = retryIntervalMultiplier; + return this; } public long getMaxRetryInterval() @@ -465,9 +481,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return maxRetryInterval; } - public void setMaxRetryInterval(final long maxRetryInterval) + public ConnectionFactoryConfiguration setMaxRetryInterval(final long maxRetryInterval) { this.maxRetryInterval = maxRetryInterval; + return this; } public int getReconnectAttempts() @@ -475,9 +492,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return reconnectAttempts; } - public void setReconnectAttempts(final int reconnectAttempts) + public ConnectionFactoryConfiguration setReconnectAttempts(final int reconnectAttempts) { this.reconnectAttempts = reconnectAttempts; + return this; } public boolean isFailoverOnInitialConnection() @@ -485,9 +503,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return failoverOnInitialConnection; } - public void setFailoverOnInitialConnection(final boolean failover) + public ConnectionFactoryConfiguration setFailoverOnInitialConnection(final boolean failover) { failoverOnInitialConnection = failover; + return this; } public String getGroupID() @@ -495,9 +514,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return groupID; } - public void setGroupID(final String groupID) + public ConnectionFactoryConfiguration setGroupID(final String groupID) { this.groupID = groupID; + return this; } // Encoding Support Implementation -------------------------------------------------------------- @@ -790,9 +810,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf return size; } - public void setFactoryType(final JMSFactoryType factoryType) + public ConnectionFactoryConfiguration setFactoryType(final JMSFactoryType factoryType) { this.factoryType = factoryType; + return this; } public JMSFactoryType getFactoryType() @@ -801,9 +822,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf } @Override - public void setCompressLargeMessages(boolean compressLargeMessage) + public ConnectionFactoryConfiguration setCompressLargeMessages(boolean compressLargeMessage) { this.compressLargeMessage = compressLargeMessage; + return this; } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSConfigurationImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSConfigurationImpl.java index 002cdd7..eaa8761 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSConfigurationImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSConfigurationImpl.java @@ -30,38 +30,31 @@ import org.hornetq.jms.server.config.TopicConfiguration; */ public class JMSConfigurationImpl implements JMSConfiguration { + private List<ConnectionFactoryConfiguration> connectionFactoryConfigurations = new ArrayList<ConnectionFactoryConfiguration>(); - private final List<ConnectionFactoryConfiguration> connectionFactoryConfigurations = new ArrayList<ConnectionFactoryConfiguration>(); + private List<JMSQueueConfiguration> queueConfigurations = new ArrayList<JMSQueueConfiguration>(); - private final List<JMSQueueConfiguration> queueConfigurations = new ArrayList<JMSQueueConfiguration>(); + private List<TopicConfiguration> topicConfigurations = new ArrayList<TopicConfiguration>(); - private final List<TopicConfiguration> topicConfigurations = new ArrayList<TopicConfiguration>(); - - private final String domain; + private String domain = HornetQDefaultConfiguration.getDefaultJmxDomain(); private Context context = null; + // JMSConfiguration implementation ------------------------------- + public JMSConfigurationImpl() { - domain = HornetQDefaultConfiguration.getDefaultJmxDomain(); } - public JMSConfigurationImpl(final List<ConnectionFactoryConfiguration> connectionFactoryConfigurations, - final List<JMSQueueConfiguration> queueConfigurations, - final List<TopicConfiguration> topicConfigurations, - final String domain) + public List<ConnectionFactoryConfiguration> getConnectionFactoryConfigurations() { - this.connectionFactoryConfigurations.addAll(connectionFactoryConfigurations); - this.queueConfigurations.addAll(queueConfigurations); - this.topicConfigurations.addAll(topicConfigurations); - this.domain = domain != null ? domain : HornetQDefaultConfiguration.getDefaultJmxDomain(); + return connectionFactoryConfigurations; } - // JMSConfiguration implementation ------------------------------- - - public List<ConnectionFactoryConfiguration> getConnectionFactoryConfigurations() + public JMSConfigurationImpl setConnectionFactoryConfigurations(List<ConnectionFactoryConfiguration> connectionFactoryConfigurations) { - return connectionFactoryConfigurations; + this.connectionFactoryConfigurations = connectionFactoryConfigurations; + return this; } public List<JMSQueueConfiguration> getQueueConfigurations() @@ -69,23 +62,42 @@ public class JMSConfigurationImpl implements JMSConfiguration return queueConfigurations; } + public JMSConfigurationImpl setQueueConfigurations(List<JMSQueueConfiguration> queueConfigurations) + { + this.queueConfigurations = queueConfigurations; + return this; + } + public List<TopicConfiguration> getTopicConfigurations() { return topicConfigurations; } + public JMSConfigurationImpl setTopicConfigurations(List<TopicConfiguration> topicConfigurations) + { + this.topicConfigurations = topicConfigurations; + return this; + } + public Context getContext() { return context; } - public void setContext(final Context context) + public JMSConfigurationImpl setContext(final Context context) { this.context = context; + return this; } public String getDomain() { return domain; } + + public JMSConfigurationImpl setDomain(final String domain) + { + this.domain = domain; + return this; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSQueueConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSQueueConfigurationImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSQueueConfigurationImpl.java index f3ced90..2fccc6f 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSQueueConfigurationImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/JMSQueueConfigurationImpl.java @@ -29,28 +29,20 @@ public class JMSQueueConfigurationImpl implements JMSQueueConfiguration // Attributes ---------------------------------------------------- - private final String name; + private String name = null; - private final String selector; + private String selector = null; - private final boolean durable; + private boolean durable = true; - private final String[] bindings; + private String[] bindings = null; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public JMSQueueConfigurationImpl(final String name, - final String selector, - final boolean durable, - final String... bindings) + public JMSQueueConfigurationImpl() { - this.name = name; - this.selector = selector; - this.durable = durable; - this.bindings = new String[bindings.length]; - System.arraycopy(bindings, 0, this.bindings, 0, bindings.length); } // QueueConfiguration implementation ----------------------------- @@ -60,21 +52,45 @@ public class JMSQueueConfigurationImpl implements JMSQueueConfiguration return bindings; } + public JMSQueueConfigurationImpl setBindings(String... bindings) + { + this.bindings = bindings; + return this; + } + public String getName() { return name; } + public JMSQueueConfigurationImpl setName(String name) + { + this.name = name; + return this; + } + public String getSelector() { return selector; } + public JMSQueueConfigurationImpl setSelector(String selector) + { + this.selector = selector; + return this; + } + public boolean isDurable() { return durable; } + public JMSQueueConfigurationImpl setDurable(boolean durable) + { + this.durable = durable; + return this; + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/TopicConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/TopicConfigurationImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/TopicConfigurationImpl.java index b6a3727..e105437 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/TopicConfigurationImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/config/impl/TopicConfigurationImpl.java @@ -29,19 +29,16 @@ public class TopicConfigurationImpl implements TopicConfiguration // Attributes ---------------------------------------------------- - private final String name; + private String name; - private final String[] bindings; + private String[] bindings; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public TopicConfigurationImpl(final String name, final String... bindings) + public TopicConfigurationImpl() { - this.name = name; - this.bindings = new String[bindings.length]; - System.arraycopy(bindings, 0, this.bindings, 0, bindings.length); } // TopicConfiguration implementation ----------------------------- @@ -51,11 +48,23 @@ public class TopicConfigurationImpl implements TopicConfiguration return bindings; } + public TopicConfigurationImpl setBindings(String... bindings) + { + this.bindings = bindings; + return this; + } + public String getName() { return name; } + public TopicConfigurationImpl setName(String name) + { + this.name = name; + return this; + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java index 5380bdc..c0b9583 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java @@ -369,10 +369,8 @@ public final class JMSServerConfigParserImpl implements JMSServerConfigParser if (discoveryGroupName != null) { - cfConfig = new ConnectionFactoryConfigurationImpl(name, - ha, - strbindings); - cfConfig.setDiscoveryGroupName(discoveryGroupName); + cfConfig = new ConnectionFactoryConfigurationImpl() + .setDiscoveryGroupName(discoveryGroupName); } else { @@ -381,40 +379,46 @@ public final class JMSServerConfigParserImpl implements JMSServerConfigParser { connectors.add(connectorName); } - cfConfig = new ConnectionFactoryConfigurationImpl(name, ha, connectors, strbindings); + cfConfig = new ConnectionFactoryConfigurationImpl() + .setConnectorNames(connectors); } - cfConfig.setFactoryType(factType); - cfConfig.setClientID(clientID); - cfConfig.setClientFailureCheckPeriod(clientFailureCheckPeriod); - cfConfig.setConnectionTTL(connectionTTL); - cfConfig.setCallTimeout(callTimeout); - cfConfig.setCallFailoverTimeout(callFailoverTimeout); - cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient); - cfConfig.setMinLargeMessageSize(minLargeMessageSize); - cfConfig.setCompressLargeMessages(compressLargeMessages); - cfConfig.setConsumerWindowSize(consumerWindowSize); - cfConfig.setConsumerMaxRate(consumerMaxRate); - cfConfig.setConfirmationWindowSize(confirmationWindowSize); - cfConfig.setProducerWindowSize(producerWindowSize); - cfConfig.setProducerMaxRate(producerMaxRate); - cfConfig.setBlockOnAcknowledge(blockOnAcknowledge); - cfConfig.setBlockOnDurableSend(blockOnDurableSend); - cfConfig.setBlockOnNonDurableSend(blockOnNonDurableSend); - cfConfig.setAutoGroup(autoGroup); - cfConfig.setPreAcknowledge(preAcknowledge); - cfConfig.setLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName); - cfConfig.setTransactionBatchSize(transactionBatchSize); - cfConfig.setDupsOKBatchSize(dupsOKBatchSize); - cfConfig.setUseGlobalPools(useGlobalPools); - cfConfig.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - cfConfig.setThreadPoolMaxSize(threadPoolMaxSize); - cfConfig.setRetryInterval(retryInterval); - cfConfig.setRetryIntervalMultiplier(retryIntervalMultiplier); - cfConfig.setMaxRetryInterval(maxRetryInterval); - cfConfig.setReconnectAttempts(reconnectAttempts); - cfConfig.setFailoverOnInitialConnection(failoverOnInitialConnection); - cfConfig.setGroupID(groupid); + cfConfig + .setName(name) + .setHA(ha) + .setBindings(strbindings) + .setFactoryType(factType) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setCacheLargeMessagesClient(cacheLargeMessagesClient) + .setMinLargeMessageSize(minLargeMessageSize) + .setCompressLargeMessages(compressLargeMessages) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection) + .setGroupID(groupid); + return cfConfig; } @@ -466,7 +470,9 @@ public final class JMSServerConfigParserImpl implements JMSServerConfigParser */ protected TopicConfiguration newTopic(final String topicName, final String[] strBindings) { - return new TopicConfigurationImpl(topicName, strBindings); + return new TopicConfigurationImpl() + .setName(topicName) + .setBindings(strBindings); } /** @@ -483,7 +489,11 @@ public final class JMSServerConfigParserImpl implements JMSServerConfigParser final boolean durable, final String[] jndiArray) { - return new JMSQueueConfigurationImpl(queueName, selectorString, durable, jndiArray); + return new JMSQueueConfigurationImpl(). + setName(queueName). + setSelector(selectorString). + setDurable(durable). + setBindings(jndiArray); } /** @@ -499,7 +509,10 @@ public final class JMSServerConfigParserImpl implements JMSServerConfigParser final ArrayList<TopicConfiguration> topics, final ArrayList<ConnectionFactoryConfiguration> cfs, String domain) { - JMSConfiguration value = new JMSConfigurationImpl(cfs, queues, topics, domain); - return value; + return new JMSConfigurationImpl() + .setConnectionFactoryConfigurations(cfs) + .setQueueConfigurations(queues) + .setTopicConfigurations(topics) + .setDomain(domain); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerManagerImpl.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerManagerImpl.java index 592a31a..08c6a27 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerManagerImpl.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/JMSServerManagerImpl.java @@ -16,6 +16,7 @@ import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.transaction.xa.Xid; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -52,6 +53,7 @@ import org.hornetq.core.server.ActivateCallback; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.Queue; import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.management.Notification; import org.hornetq.core.settings.impl.AddressSettings; import org.hornetq.core.transaction.ResourceManager; import org.hornetq.core.transaction.Transaction; @@ -77,10 +79,12 @@ import org.hornetq.jms.server.config.JMSQueueConfiguration; import org.hornetq.jms.server.config.TopicConfiguration; import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl; import org.hornetq.jms.server.management.JMSManagementService; +import org.hornetq.jms.server.management.JMSNotificationType; import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl; import org.hornetq.jms.transaction.JMSTransactionDetail; import org.hornetq.spi.core.naming.BindingRegistry; import org.hornetq.utils.TimeAndCounterIDGenerator; +import org.hornetq.utils.TypedProperties; import org.hornetq.utils.json.JSONArray; import org.hornetq.utils.json.JSONObject; @@ -338,6 +342,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } } + @Override + public void activationComplete() + { + + } + public void recoverJndiBindings(String name, PersistedType type) throws NamingException { List<String> bindings = unRecoveredJndi.get(name); @@ -628,6 +638,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } }); + sendNotification(JMSNotificationType.QUEUE_CREATED, queueName); return true; } @@ -683,6 +694,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } }); + sendNotification(JMSNotificationType.TOPIC_CREATED, topicName); return true; } @@ -901,18 +913,28 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback { checkInitialised(); - removeFromJNDI(queues, queueJNDI, name); + server.destroyQueue(HornetQDestination.createQueueAddressFromName(name), null, !removeConsumers, removeConsumers); - queues.remove(name); - queueJNDI.remove(name); + // if the queue has consumers and 'removeConsumers' is false then the queue won't actually be removed + // therefore only remove the queue from JNDI, etc. if the queue is actually removed + if (this.server.getPostOffice().getBinding(HornetQDestination.createQueueAddressFromName(name)) == null) + { + removeFromJNDI(queues, queueJNDI, name); - jmsManagementService.unregisterQueue(name); + queues.remove(name); + queueJNDI.remove(name); - server.destroyQueue(HornetQDestination.createQueueAddressFromName(name), null, false, removeConsumers); + jmsManagementService.unregisterQueue(name); - storage.deleteDestination(PersistedType.Queue, name); + storage.deleteDestination(PersistedType.Queue, name); - return true; + sendNotification(JMSNotificationType.QUEUE_DESTROYED, name); + return true; + } + else + { + return false; + } } public synchronized boolean destroyTopic(final String name) throws Exception @@ -923,14 +945,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception { checkInitialised(); - - removeFromJNDI(topics, topicJNDI, name); - - topics.remove(name); - topicJNDI.remove(name); - - jmsManagementService.unregisterTopic(name); - AddressControl addressControl = (AddressControl) server.getManagementService() .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name)); if (addressControl != null) @@ -950,9 +964,30 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback server.destroyQueue(SimpleString.toSimpleString(queueName), null, !removeConsumers, removeConsumers); } } + + if (addressControl.getQueueNames().length == 0) + { + removeFromJNDI(topics, topicJNDI, name); + + topics.remove(name); + topicJNDI.remove(name); + + jmsManagementService.unregisterTopic(name); + + storage.deleteDestination(PersistedType.Topic, name); + + sendNotification(JMSNotificationType.TOPIC_DESTROYED, name); + return true; + } + else + { + return false; + } + } + else + { + return false; } - storage.deleteDestination(PersistedType.Topic, name); - return true; } public synchronized void createConnectionFactory(final String name, @@ -965,8 +1000,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback HornetQConnectionFactory cf = connectionFactories.get(name); if (cf == null) { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, connectorNames); - configuration.setFactoryType(cfType); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setConnectorNames(connectorNames) + .setFactoryType(cfType); + createConnectionFactory(true, configuration, jndiBindings); } } @@ -1011,37 +1050,41 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback HornetQConnectionFactory cf = connectionFactories.get(name); if (cf == null) { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, connectorNames); - configuration.setClientID(clientID); - configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod); - configuration.setConnectionTTL(connectionTTL); - configuration.setFactoryType(cfType); - configuration.setCallTimeout(callTimeout); - configuration.setCallFailoverTimeout(callFailoverTimeout); - configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient); - configuration.setMinLargeMessageSize(minLargeMessageSize); - configuration.setConsumerWindowSize(consumerWindowSize); - configuration.setConsumerMaxRate(consumerMaxRate); - configuration.setConfirmationWindowSize(confirmationWindowSize); - configuration.setProducerWindowSize(producerWindowSize); - configuration.setProducerMaxRate(producerMaxRate); - configuration.setBlockOnAcknowledge(blockOnAcknowledge); - configuration.setBlockOnDurableSend(blockOnDurableSend); - configuration.setBlockOnNonDurableSend(blockOnNonDurableSend); - configuration.setAutoGroup(autoGroup); - configuration.setPreAcknowledge(preAcknowledge); - configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); - configuration.setTransactionBatchSize(transactionBatchSize); - configuration.setDupsOKBatchSize(dupsOKBatchSize); - configuration.setUseGlobalPools(useGlobalPools); - configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - configuration.setThreadPoolMaxSize(threadPoolMaxSize); - configuration.setRetryInterval(retryInterval); - configuration.setRetryIntervalMultiplier(retryIntervalMultiplier); - configuration.setMaxRetryInterval(maxRetryInterval); - configuration.setReconnectAttempts(reconnectAttempts); - configuration.setFailoverOnInitialConnection(failoverOnInitialConnection); - configuration.setGroupID(groupId); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setConnectorNames(connectorNames) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setFactoryType(cfType) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setCacheLargeMessagesClient(cacheLargeMessagesClient) + .setMinLargeMessageSize(minLargeMessageSize) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setLoadBalancingPolicyClassName(loadBalancingPolicyClassName) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection) + .setGroupID(groupId); + createConnectionFactory(true, configuration, jndiBindings); } } @@ -1086,38 +1129,41 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback HornetQConnectionFactory cf = connectionFactories.get(name); if (cf == null) { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, jndiBindings); - configuration.setDiscoveryGroupName(discoveryGroupName); - configuration.setFactoryType(cfType); - configuration.setClientID(clientID); - configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod); - configuration.setConnectionTTL(connectionTTL); - configuration.setCallTimeout(callTimeout); - configuration.setCallFailoverTimeout(callFailoverTimeout); - configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient); - configuration.setMinLargeMessageSize(minLargeMessageSize); - configuration.setCompressLargeMessages(compressLargeMessages); - configuration.setConsumerWindowSize(consumerWindowSize); - configuration.setConsumerMaxRate(consumerMaxRate); - configuration.setConfirmationWindowSize(confirmationWindowSize); - configuration.setProducerWindowSize(producerWindowSize); - configuration.setProducerMaxRate(producerMaxRate); - configuration.setBlockOnAcknowledge(blockOnAcknowledge); - configuration.setBlockOnDurableSend(blockOnDurableSend); - configuration.setBlockOnNonDurableSend(blockOnNonDurableSend); - configuration.setAutoGroup(autoGroup); - configuration.setPreAcknowledge(preAcknowledge); - configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName); - configuration.setTransactionBatchSize(transactionBatchSize); - configuration.setDupsOKBatchSize(dupsOKBatchSize); - configuration.setUseGlobalPools(useGlobalPools); - configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); - configuration.setThreadPoolMaxSize(threadPoolMaxSize); - configuration.setRetryInterval(retryInterval); - configuration.setRetryIntervalMultiplier(retryIntervalMultiplier); - configuration.setMaxRetryInterval(maxRetryInterval); - configuration.setReconnectAttempts(reconnectAttempts); - configuration.setFailoverOnInitialConnection(failoverOnInitialConnection); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setBindings(jndiBindings) + .setDiscoveryGroupName(discoveryGroupName) + .setFactoryType(cfType) + .setClientID(clientID) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setCacheLargeMessagesClient(cacheLargeMessagesClient) + .setMinLargeMessageSize(minLargeMessageSize) + .setCompressLargeMessages(compressLargeMessages) + .setConsumerWindowSize(consumerWindowSize) + .setConsumerMaxRate(consumerMaxRate) + .setConfirmationWindowSize(confirmationWindowSize) + .setProducerWindowSize(producerWindowSize) + .setProducerMaxRate(producerMaxRate) + .setBlockOnAcknowledge(blockOnAcknowledge) + .setBlockOnDurableSend(blockOnDurableSend) + .setBlockOnNonDurableSend(blockOnNonDurableSend) + .setAutoGroup(autoGroup) + .setPreAcknowledge(preAcknowledge) + .setLoadBalancingPolicyClassName(loadBalancingPolicyClassName) + .setTransactionBatchSize(transactionBatchSize) + .setDupsOKBatchSize(dupsOKBatchSize) + .setUseGlobalPools(useGlobalPools) + .setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize) + .setThreadPoolMaxSize(threadPoolMaxSize) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setReconnectAttempts(reconnectAttempts) + .setFailoverOnInitialConnection(failoverOnInitialConnection); createConnectionFactory(true, configuration, jndiBindings); } } @@ -1132,8 +1178,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback HornetQConnectionFactory cf = connectionFactories.get(name); if (cf == null) { - ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, jndiBindings); - configuration.setDiscoveryGroupName(discoveryGroupName); + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(name) + .setHA(ha) + .setBindings(jndiBindings) + .setDiscoveryGroupName(discoveryGroupName); createConnectionFactory(true, configuration, jndiBindings); } } @@ -1205,10 +1254,26 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } JMSServerManagerImpl.this.recoverJndiBindings(cfConfig.getName(), PersistedType.ConnectionFactory); + sendNotification(JMSNotificationType.CONNECTION_FACTORY_CREATED, cfConfig.getName()); } }); } + private void sendNotification(JMSNotificationType type, String message) + { + TypedProperties prop = new TypedProperties(); + prop.putSimpleStringProperty(JMSNotificationType.MESSAGE, SimpleString.toSimpleString(message)); + Notification notif = new Notification(null, type, prop); + try + { + server.getManagementService().sendNotification(notif); + } + catch (Exception e) + { + HornetQJMSServerLogger.LOGGER.warn("Failed to send notification : " + notif); + } + } + public JMSStorageManager getJMSStorageManager() { return storage; @@ -1446,6 +1511,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback } }); + if (valueReturn.get()) + { + sendNotification(JMSNotificationType.CONNECTION_FACTORY_DESTROYED, name); + } + return valueReturn.get(); } @@ -1457,10 +1527,6 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback { checkInitialised(); List<String> jndiBindings = connectionFactoryJNDI.get(name); - if (jndiBindings == null || jndiBindings.size() == 0) - { - return false; - } if (registry != null) { @@ -1496,6 +1562,18 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback return server.getHornetQServerControl().closeConnectionsForAddress(ipAddress); } + public boolean closeConsumerConnectionsForAddress(final String address) throws Exception + { + checkInitialised(); + return server.getHornetQServerControl().closeConsumerConnectionsForAddress(address); + } + + public boolean closeConnectionsForUser(final String userName) throws Exception + { + checkInitialised(); + return server.getHornetQServerControl().closeConnectionsForUser(userName); + } + public String[] listConnectionIDs() throws Exception { return server.getHornetQServerControl().listConnectionIDs(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/StandaloneNamingServer.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/StandaloneNamingServer.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/StandaloneNamingServer.java index 5f2def5..ad4aa5a 100644 --- a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/StandaloneNamingServer.java +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/impl/StandaloneNamingServer.java @@ -157,5 +157,11 @@ public class StandaloneNamingServer implements HornetQComponent } activated = false; } + + @Override + public void activationComplete() + { + + } } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-server/src/main/java/org/hornetq/jms/server/management/JMSNotificationType.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-server/src/main/java/org/hornetq/jms/server/management/JMSNotificationType.java b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/management/JMSNotificationType.java new file mode 100644 index 0000000..0476265 --- /dev/null +++ b/hornetq-jms-server/src/main/java/org/hornetq/jms/server/management/JMSNotificationType.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.server.management; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.NotificationType; + +public enum JMSNotificationType implements NotificationType +{ + QUEUE_CREATED(0), + QUEUE_DESTROYED(1), + TOPIC_CREATED(2), + TOPIC_DESTROYED(3), + CONNECTION_FACTORY_CREATED(4), + CONNECTION_FACTORY_DESTROYED(5); + + public static final SimpleString MESSAGE = new SimpleString("message"); + + private int type; + + private JMSNotificationType(int type) + { + this.type = type; + } + + @Override + public int getType() + { + return type; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java b/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java index 8c5851f..97948fb 100644 --- a/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java +++ b/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java @@ -226,9 +226,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private volatile long bufferReuseLastTime = System.currentTimeMillis(); /** - * This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO. - * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more - * and ready to be reused or GCed + * This queue is fed by {@link org.hornetq.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback} + * which is called directly by NIO or NIO. On the case of the AIO this is almost called by the native layer as + * soon as the buffer is not being used any more and ready to be reused or GCed */ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java ---------------------------------------------------------------------- diff --git a/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java b/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java index dda3f25..7541417 100644 --- a/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java +++ b/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.hornetq.core.journal.SequentialFile; @@ -79,6 +80,8 @@ public class JournalFilesRepository private final int userVersion; + private final AtomicInteger freeFilesCount = new AtomicInteger(0); + private Executor openFilesExecutor; private final Runnable pushOpenRunnable = new Runnable() @@ -140,6 +143,8 @@ public class JournalFilesRepository freeFiles.clear(); + freeFilesCount.set(0); + for (JournalFile file : openedFiles) { try @@ -207,8 +212,7 @@ public class JournalFilesRepository public void ensureMinFiles() throws Exception { - // FIXME - size() involves a scan - int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size()); + int filesToCreate = minFiles - (dataFiles.size() + freeFilesCount.get()); if (filesToCreate > 0) { @@ -216,6 +220,7 @@ public class JournalFilesRepository { // Keeping all files opened can be very costly (mainly on AIO) freeFiles.add(createFile(false, false, true, false, -1)); + freeFilesCount.getAndIncrement(); } } @@ -368,7 +373,7 @@ public class JournalFilesRepository public int getFreeFilesCount() { - return freeFiles.size(); + return freeFilesCount.get(); } /** @@ -405,8 +410,7 @@ public class JournalFilesRepository file.getFile().delete(); } else - // FIXME - size() involves a scan!!! - if (!checkDelete || (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) + if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) { // Re-initialise it @@ -423,6 +427,7 @@ public class JournalFilesRepository } freeFiles.add(jf); + freeFilesCount.getAndIncrement(); } else { @@ -431,10 +436,10 @@ public class JournalFilesRepository HornetQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size()); HornetQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size()); HornetQJournalLogger.LOGGER.trace("minfiles = " + minFiles); - HornetQJournalLogger.LOGGER.trace("Free Files = " + freeFiles.size()); + HornetQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get()); HornetQJournalLogger.LOGGER.trace("File " + file + " being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" + - (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size()) + + (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size()) + ") < minFiles (" + minFiles + ")"); } file.getFile().delete(); @@ -453,7 +458,9 @@ public class JournalFilesRepository public JournalFile getFreeFile() { - return freeFiles.remove(); + JournalFile file = freeFiles.remove(); + freeFilesCount.getAndDecrement(); + return file; } // Opened files operations ======================================= @@ -544,6 +551,11 @@ public class JournalFilesRepository nextFile = freeFiles.poll(); + if (nextFile != null) + { + freeFilesCount.getAndDecrement(); + } + if (nextFile == null) { nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/pom.xml b/hornetq-protocols/hornetq-amqp-protocol/pom.xml index 9c6fe46..b1079d3 100644 --- a/hornetq-protocols/hornetq-amqp-protocol/pom.xml +++ b/hornetq-protocols/hornetq-amqp-protocol/pom.xml @@ -14,6 +14,17 @@ </properties> <dependencies> + <!-- JMS Client because of some Convertions that are done --> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-core-client</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging-processor</artifactId> @@ -32,14 +43,34 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-proton-plug</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.qpid</groupId> - <artifactId>proton-api</artifactId> + <artifactId>proton-j</artifactId> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>proton-jms</artifactId> </dependency> + + + + <dependency> + <groupId>org.jboss.spec.javax.jms</groupId> + <artifactId>jboss-jms-api_2.0_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQAMQPProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQAMQPProtocolMessageBundle.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQAMQPProtocolMessageBundle.java deleted file mode 100644 index ed020b7..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQAMQPProtocolMessageBundle.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.core.protocol.proton; - -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPIllegalStateException; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPInternalErrorException; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPInvalidFieldException; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageBundle; -import org.jboss.logging.Messages; - -/** - * Logger Code 11 - * <p> - * Each message id must be 6 digits long starting with 10, the 3rd digit should be 9. So the range - * is from 219000 to 119999. - * <p> - * Once released, methods should not be deleted as they may be referenced by knowledge base - * articles. Unused methods should be marked as deprecated. - * @author <a href="mailto:[email protected]">Andy Taylor</a> - */ -@MessageBundle(projectCode = "HQ") -public interface HornetQAMQPProtocolMessageBundle -{ - HornetQAMQPProtocolMessageBundle BUNDLE = Messages.getBundle(HornetQAMQPProtocolMessageBundle.class); - - - @Message(id = 219000, value = "target address not set", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInvalidFieldException targetAddressNotSet(); - - @Message(id = 219001, value = "error creating temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInternalErrorException errorCreatingTemporaryQueue(String message); - - @Message(id = 219002, value = "target address does not exist", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException addressDoesntExist(); - - @Message(id = 219003, value = "error finding temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInternalErrorException errorFindingTemporaryQueue(String message); - - @Message(id = 219004, value = "error creating HornetQ Session, {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInternalErrorException errorCreatingHornetQSession(String message); - - @Message(id = 219005, value = "error creating HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInternalErrorException errorCreatingHornetQConsumer(String message); - - @Message(id = 219006, value = "error starting HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorStartingConsumer(String message); - - @Message(id = 219007, value = "error acknowledging message {0}, {1}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorAcknowledgingMessage(long messageID, String message); - - @Message(id = 219008, value = "error cancelling message {0}, {1}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorCancellingMessage(long messageID, String message); - - @Message(id = 219009, value = "error closing consumer {0}, {1}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorClosingConsumer(long consumerID, String message); - - @Message(id = 219010, value = "source address does not exist", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInvalidFieldException sourceAddressDoesntExist(); - - @Message(id = 219011, value = "source address not set", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPInvalidFieldException sourceAddressNotSet(); - - @Message(id = 219012, value = "error rolling back coordinator: {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorRollingbackCoordinator(String message); - - @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT) - HornetQAMQPIllegalStateException errorCommittingCoordinator(String message); - - @Message(id = 219015, value = "error decoding AMQP frame", format = Message.Format.MESSAGE_FORMAT) - String decodeError(); -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQProtonRemotingConnection.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQProtonRemotingConnection.java new file mode 100644 index 0000000..8c6f778 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/HornetQProtonRemotingConnection.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton; + +import java.util.concurrent.Executor; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQException; +import org.hornetq.core.client.HornetQClientLogger; +import org.hornetq.spi.core.protocol.AbstractRemotingConnection; +import org.hornetq.spi.core.remoting.Connection; +import org.proton.plug.AMQPConnectionContext; + +/** + * + * This is a Server's Connection representation used by HornetQ. + * @author Clebert Suconic + */ + +public class HornetQProtonRemotingConnection extends AbstractRemotingConnection +{ + private final AMQPConnectionContext amqpConnection; + + private boolean destroyed = false; + + private final ProtonProtocolManager manager; + + + public HornetQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, Executor executor) + { + super(transportConnection, executor); + this.manager = manager; + this.amqpConnection = amqpConnection; + } + + public ProtonProtocolManager getManager() + { + return manager; + } + + /* + * This can be called concurrently by more than one thread so needs to be locked + */ + public void fail(final HornetQException me, String scaleDownTargetNodeID) + { + if (destroyed) + { + return; + } + + destroyed = true; + + HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + // Then call the listeners + callFailureListeners(me, scaleDownTargetNodeID); + + callClosingListeners(); + + internalClose(); + } + + + @Override + public void destroy() + { + synchronized (this) + { + if (destroyed) + { + return; + } + + destroyed = true; + } + + + callClosingListeners(); + + internalClose(); + + } + + @Override + public boolean isClient() + { + return false; + } + + @Override + public boolean isDestroyed() + { + return destroyed; + } + + @Override + public void disconnect(boolean criticalError) + { + getTransportConnection().close(); + } + + /** + * Disconnect the connection, closing all channels + */ + @Override + public void disconnect(String scaleDownNodeID, boolean criticalError) + { + getTransportConnection().close(); + } + + @Override + public boolean checkDataReceived() + { + return amqpConnection.checkDataReceived(); + } + + @Override + public void flush() + { + amqpConnection.flush(); + } + + @Override + public void bufferReceived(Object connectionID, HornetQBuffer buffer) + { + amqpConnection.inputBuffer(buffer.byteBuf()); + super.bufferReceived(connectionID, buffer); + } + + private void internalClose() + { + // We close the underlying transport connection + getTransportConnection().close(); + } +}
