http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java index 6f56d00..8b23cea 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java @@ -20,7 +20,7 @@ import org.hornetq.spi.core.protocol.RemotingConnection; * <p> * To add an interceptor to HornetQ server, you have to modify the server configuration file * {@literal hornetq-configuration.xml}.<br> - * To add it to a client, use {@link ServerLocator#addIncomingInterceptor(Interceptor)} + * To add it to a client, use {@link org.hornetq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)} * * @author [email protected] * @author <a href="mailto:[email protected]">Tim Fox</a>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java index 208c7b7..6c55c38 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.ReceiverAdapter; +import org.jgroups.conf.PlainConfigurator; /** * The configuration for creating broadcasting/discovery groups using JGroups channels @@ -55,6 +56,8 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi { factory = new BroadcastEndpointFactory() { + private static final long serialVersionUID = 1047956472941098435L; + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { @@ -69,6 +72,8 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi { factory = new BroadcastEndpointFactory() { + private static final long serialVersionUID = 5110372849181145377L; + @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { @@ -182,12 +187,23 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi broadcastOpened = true; } - private void initChannel(final String fileName, final String channelName) throws Exception + private void initChannel(final String jgroupsConfig, final String channelName) throws Exception { - URL configURL = Thread.currentThread().getContextClassLoader().getResource(fileName); + PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig); + try + { + this.channel = JChannelManager.getJChannel(channelName, configurator); + return; + } + catch (Exception e) + { + this.channel = null; + } + URL configURL = Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig); + if (configURL == null) { - throw new RuntimeException("couldn't find JGroups configuration " + fileName); + throw new RuntimeException("couldn't find JGroups configuration " + jgroupsConfig); } this.channel = JChannelManager.getJChannel(channelName, configURL); } @@ -242,45 +258,6 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi } /** - * This is used for identifying a unique JChannel instance. - * Because we have two ways to get a JChannel (by configuration - * or by passing in a JChannel instance), the key needs to take - * this into consideration when doing comparison. - * - * @param <T> : either being a JChannel or a URL representing the JGroups - * configuration file. - */ - private static class ChannelKey<T> - { - private final String name; - private final T channelSource; - - public ChannelKey(String name, T t) - { - this.name = name; - this.channelSource = t; - } - - @Override - public int hashCode() - { - return name.hashCode(); - } - - @Override - public boolean equals(Object t) - { - if (t == null || (!(t instanceof ChannelKey))) - { - return false; - } - - ChannelKey<?> key = (ChannelKey<?>) t; - return (name.equals(key.name) && channelSource.equals(key.channelSource)); - } - } - - /** * This class wraps a JChannel with a reference counter. The reference counter * controls the life of the JChannel. When reference count is zero, the channel * will be disconnected. @@ -292,7 +269,6 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi int refCount = 1; JChannel channel; String channelName; - T source; List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>(); public JChannelWrapper(String channelName, T t) throws Exception @@ -307,11 +283,14 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi { this.channel = (JChannel) t; } + else if (t instanceof PlainConfigurator) + { + this.channel = new JChannel((PlainConfigurator)t); + } else { throw new IllegalArgumentException("Unsupported type " + t); } - this.source = t; } public synchronized void close() @@ -319,7 +298,7 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi refCount--; if (refCount == 0) { - JChannelManager.closeChannel(new ChannelKey<T>(this.channelName, source), this.channelName, channel); + JChannelManager.closeChannel(this.channelName, channel); } } @@ -387,31 +366,30 @@ public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpoi */ private static class JChannelManager { - private static Map<ChannelKey<?>, JChannelWrapper<?>> channels; + private static Map<String, JChannelWrapper<?>> channels; public static synchronized <T> JChannelWrapper<?> getJChannel(String channelName, T t) throws Exception { if (channels == null) { - channels = new HashMap<ChannelKey<?>, JChannelWrapper<?>>(); + channels = new HashMap<String, JChannelWrapper<?>>(); } - ChannelKey<T> key = new ChannelKey<T>(channelName, t); - JChannelWrapper<?> wrapper = channels.get(key); + JChannelWrapper<?> wrapper = channels.get(channelName); if (wrapper == null) { wrapper = new JChannelWrapper<T>(channelName, t); - channels.put(key, wrapper); + channels.put(channelName, wrapper); return wrapper; } return wrapper.addRef(); } - public static synchronized void closeChannel(ChannelKey<?> key, String channelName, JChannel channel) + public static synchronized void closeChannel(String channelName, JChannel channel) { channel.setReceiver(null); channel.disconnect(); channel.close(); - JChannelWrapper<?> wrapper = channels.remove(key); + JChannelWrapper<?> wrapper = channels.remove(channelName); if (wrapper == null) { throw new IllegalStateException("Did not find channel " + channelName); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java index 52867f8..2f93e25 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java @@ -44,7 +44,7 @@ import org.hornetq.utils.UUID; * </pre> * <p> * If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a - * {@code boolean}), a {@link PropertyConversionException} will be thrown. + * {@code boolean}), a {@link HornetQPropertyConversionException} will be thrown. * * @author <a href="mailto:[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]">ClebertSuconic</a> @@ -104,7 +104,7 @@ public interface Message * * @param userID */ - void setUserID(UUID userID); + Message setUserID(UUID userID); /** * Returns the address this message is sent to. @@ -135,7 +135,7 @@ public interface Message * * @param durable {@code true} to flag this message as durable, {@code false} else */ - void setDurable(boolean durable); + Message setDurable(boolean durable); /** * Returns the expiration time of this message. @@ -152,7 +152,7 @@ public interface Message * * @param expiration expiration time */ - void setExpiration(long expiration); + Message setExpiration(long expiration); /** * Returns the message timestamp. @@ -167,7 +167,7 @@ public interface Message * * @param timestamp timestamp */ - void setTimestamp(long timestamp); + Message setTimestamp(long timestamp); /** * Returns the message priority. @@ -183,7 +183,7 @@ public interface Message * * @param priority the new message priority */ - void setPriority(byte priority); + Message setPriority(byte priority); /** * Returns the size of the <em>encoded</em> message. @@ -201,6 +201,16 @@ public interface Message HornetQBuffer getBodyBuffer(); /** + * Writes the input byte array to the message body HornetQBuffer + */ + Message writeBodyBufferBytes(byte[] bytes); + + /** + * Writes the input String to the message body HornetQBuffer + */ + Message writeBodyBufferString(String string); + + /** * Returns a <em>copy</em> of the message body as a HornetQBuffer. Any modification * of this buffer should not impact the underlying buffer. */ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java index 86d2c4f..2f06fb8 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; import org.hornetq.core.client.HornetQClientMessageBundle; +import org.hornetq.core.remoting.impl.TransportConfigurationUtil; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.utils.UUIDGenerator; @@ -81,6 +82,7 @@ public class TransportConfiguration implements Serializable */ public TransportConfiguration() { + this.params = new HashMap<>(); } /** @@ -95,7 +97,14 @@ public class TransportConfiguration implements Serializable { factoryClassName = className; - this.params = params; + if (params == null || params.isEmpty()) + { + this.params = TransportConfigurationUtil.getDefaults(className); + } + else + { + this.params = params; + } this.name = name; } @@ -402,4 +411,4 @@ public class TransportConfiguration implements Serializable { return str.replace('.', '-'); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java new file mode 100644 index 0000000..9a9565b --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.api.core; + +import java.util.Map; + +/** + * Helper interface for specifying default parameters on Transport Configurations. + * + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ +public interface TransportConfigurationHelper +{ + Map<String, Object> getDefaults(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java index 8ddd627..73deb9a 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java @@ -20,7 +20,6 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import org.hornetq.core.client.HornetQClientLogger; @@ -37,23 +36,16 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa { private static final long serialVersionUID = 1052413739064253955L; - private final transient String localBindAddress; + private transient String localBindAddress = null; - private final transient int localBindPort; + private transient int localBindPort = -1; - private final String groupAddress; + private String groupAddress = null; - private final int groupPort; + private int groupPort = -1; - public UDPBroadcastGroupConfiguration(final String groupAddress, - final int groupPort, - final String localBindAddress, - final int localBindPort) + public UDPBroadcastGroupConfiguration() { - this.groupAddress = groupAddress; - this.groupPort = groupPort; - this.localBindAddress = localBindAddress; - this.localBindPort = localBindPort; } public BroadcastEndpointFactory createBroadcastEndpointFactory() @@ -63,10 +55,11 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa @Override public BroadcastEndpoint createBroadcastEndpoint() throws Exception { - return new UDPBroadcastEndpoint(groupAddress != null ? InetAddress.getByName(groupAddress) : null, - groupPort, - localBindAddress != null ? InetAddress.getByName(localBindAddress) : null, - localBindPort); + return new UDPBroadcastEndpoint() + .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null) + .setLocalBindPort(localBindPort); } }; } @@ -76,21 +69,45 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa return groupAddress; } + public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress) + { + this.groupAddress = groupAddress; + return this; + } + public int getGroupPort() { return groupPort; } + public UDPBroadcastGroupConfiguration setGroupPort(int groupPort) + { + this.groupPort = groupPort; + return this; + } + public int getLocalBindPort() { return localBindPort; } + public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort) + { + this.localBindPort = localBindPort; + return this; + } + public String getLocalBindAddress() { return localBindAddress; } + public UDPBroadcastGroupConfiguration setLocalBindAddress(String localBindAddress) + { + this.localBindAddress = localBindAddress; + return this; + } + /** * <p> This is the member discovery implementation using direct UDP. It was extracted as a refactoring from * {@link org.hornetq.core.cluster.DiscoveryGroup}</p> @@ -103,13 +120,13 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa { private static final int SOCKET_TIMEOUT = 500; - private final InetAddress localAddress; + private InetAddress localAddress; - private final int localBindPort; + private int localBindPort; - private final InetAddress groupAddress; + private InetAddress groupAddress; - private final int groupPort; + private int groupPort; private DatagramSocket broadcastingSocket; @@ -117,15 +134,32 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa private volatile boolean open; - public UDPBroadcastEndpoint(final InetAddress groupAddress, - final int groupPort, - final InetAddress localBindAddress, - final int localBindPort) throws UnknownHostException + public UDPBroadcastEndpoint() + { + } + + public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress) { this.groupAddress = groupAddress; + return this; + } + + public UDPBroadcastEndpoint setGroupPort(int groupPort) + { this.groupPort = groupPort; - this.localAddress = localBindAddress; + return this; + } + + public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress) + { + this.localAddress = localAddress; + return this; + } + + public UDPBroadcastEndpoint setLocalBindPort(int localBindPort) + { this.localBindPort = localBindPort; + return this; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java index 3f60863..06e3f56 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java @@ -13,6 +13,7 @@ package org.hornetq.api.core.client; import org.hornetq.api.core.HornetQException; +import org.hornetq.spi.core.remoting.ConsumerContext; /** * A ClientConsumer receives messages from HornetQ queues. @@ -38,7 +39,7 @@ public interface ClientConsumer extends AutoCloseable * HornetQ implements this as a long but this could be protocol dependent. * @return */ - Object getId(); + ConsumerContext getConsumerContext(); /** * Receives a message from a queue. @@ -95,7 +96,7 @@ public interface ClientConsumer extends AutoCloseable * @param handler a MessageHandler * @throws HornetQException if an exception occurs while setting the MessageHandler */ - void setMessageHandler(MessageHandler handler) throws HornetQException; + ClientConsumer setMessageHandler(MessageHandler handler) throws HornetQException; /** * Closes the consumer. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java index 7903e0e..1a23af3 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java @@ -17,6 +17,7 @@ import java.io.OutputStream; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Message; +import org.hornetq.api.core.SimpleString; /** * @@ -38,8 +39,9 @@ public interface ClientMessage extends Message * <p> * This method is not meant to be called by HornetQ clients. * @param deliveryCount message delivery count + * @return this ClientMessage */ - void setDeliveryCount(int deliveryCount); + ClientMessage setDeliveryCount(int deliveryCount); /** * Acknowledges reception of this message. @@ -50,7 +52,7 @@ public interface ClientMessage extends Message * @throws HornetQException if an error occurred while acknowledging the message. * @see ClientSession#isAutoCommitAcks() */ - void acknowledge() throws HornetQException; + ClientMessage acknowledge() throws HornetQException; /** * Acknowledges reception of a single message. @@ -61,7 +63,7 @@ public interface ClientMessage extends Message * @throws HornetQException if an error occurred while acknowledging the message. * @see ClientSession#isAutoCommitAcks() */ - void individualAcknowledge() throws HornetQException; + ClientMessage individualAcknowledge() throws HornetQException; /** * This can be optionally used to verify if the entire message has been received. @@ -84,8 +86,9 @@ public interface ClientMessage extends Message * This method is used when consuming large messages * * @throws HornetQException + * @return this ClientMessage */ - void setOutputStream(OutputStream out) throws HornetQException; + ClientMessage setOutputStream(OutputStream out) throws HornetQException; /** * Saves the content of the message to the OutputStream. @@ -111,7 +114,119 @@ public interface ClientMessage extends Message * Sets the body's IntputStream. * <br> * This method is used when sending large messages + * @return this ClientMessage */ - void setBodyInputStream(InputStream bodyInputStream); + ClientMessage setBodyInputStream(InputStream bodyInputStream); + + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putBooleanProperty(SimpleString key, boolean value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putBooleanProperty(String key, boolean value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putByteProperty(SimpleString key, byte value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putByteProperty(String key, byte value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putBytesProperty(SimpleString key, byte[] value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putBytesProperty(String key, byte[] value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putShortProperty(SimpleString key, short value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putShortProperty(String key, short value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putCharProperty(SimpleString key, char value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putCharProperty(String key, char value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putIntProperty(SimpleString key, int value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putIntProperty(String key, int value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putLongProperty(SimpleString key, long value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putLongProperty(String key, long value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putFloatProperty(SimpleString key, float value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putFloatProperty(String key, float value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putDoubleProperty(SimpleString key, double value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putDoubleProperty(String key, double value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putStringProperty(SimpleString key, SimpleString value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage putStringProperty(String key, String value); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage writeBodyBufferBytes(byte[] bytes); + + /** + * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API + */ + ClientMessage writeBodyBufferString(String string); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java index cb56d0a..b55c9e1 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java @@ -113,7 +113,7 @@ public interface ClientSession extends XAResource, AutoCloseable * * @throws HornetQException if an exception occurs while starting the session */ - void start() throws HornetQException; + ClientSession start() throws HornetQException; /** * Stops the session. @@ -642,8 +642,9 @@ public interface ClientSession extends XAResource, AutoCloseable * Sets a <code>SendAcknowledgementHandler</code> for this session. * * @param handler a SendAcknowledgementHandler + * @return this ClientSession */ - void setSendAcknowledgementHandler(SendAcknowledgementHandler handler); + ClientSession setSendAcknowledgementHandler(SendAcknowledgementHandler handler); /** * Attach any metadata to the session. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java index f6936b5..4c2d98c 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java @@ -144,8 +144,9 @@ public interface ClientSessionFactory extends AutoCloseable * Adds a FailoverEventListener to the session which is notified if a failover event occurs on the session. * * @param listener the listener to add + * @return this ClientSessionFactory */ - void addFailoverListener(FailoverEventListener listener); + ClientSessionFactory addFailoverListener(FailoverEventListener listener); /** * Removes a FailoverEventListener to the session. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java index 01a830a..eb2efa8 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java @@ -16,6 +16,7 @@ import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.client.impl.Topology; +import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory; /** * The serverLocator locates a server, but beyond that it locates a server based on a list. @@ -24,7 +25,7 @@ import org.hornetq.core.client.impl.Topology; * HA, the locator will always get an updated list of members to the server, the server will send * the updated list to the client. * <p> - * If you use UDP or JGroups (exclusively JGropus or UDP), the initial discovery is done by the + * If you use UDP or JGroups (exclusively JGroups or UDP), the initial discovery is done by the * grouping finder, after the initial connection is made the server will always send updates to the * client. But the listeners will listen for updates on grouping. * @@ -114,8 +115,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to disable) or greater than 0. * * @param clientFailureCheckPeriod the period to check failure + * @return this ServerLocator */ - void setClientFailureCheckPeriod(long clientFailureCheckPeriod); + ServerLocator setClientFailureCheckPeriod(long clientFailureCheckPeriod); /** * When <code>true</code>, consumers created through this factory will create temporary files to @@ -134,8 +136,9 @@ public interface ServerLocator extends AutoCloseable * Sets whether large messages received by consumers created through this factory will be cached in temporary files or not. * * @param cached <code>true</code> to cache large messages in temporary files, <code>false</code> else + * @return this ServerLocator */ - void setCacheLargeMessagesClient(boolean cached); + ServerLocator setCacheLargeMessagesClient(boolean cached); /** * Returns the connection <em>time-to-live</em>. @@ -154,14 +157,15 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to disable) or greater or equals to 0. * * @param connectionTTL period in milliseconds + * @return this ServerLocator */ - void setConnectionTTL(long connectionTTL); + ServerLocator setConnectionTTL(long connectionTTL); /** * Returns the blocking calls timeout. * <p> * If client's blocking calls to the server take more than this timeout, the call will throw a - * {@link HornetQException} with the code {@link HornetQExceptionType#CONNECTION_TIMEDOUT}. Value + * {@link org.hornetq.api.core.HornetQException} with the code {@link org.hornetq.api.core.HornetQExceptionType#CONNECTION_TIMEDOUT}. Value * is in milliseconds, default value is {@link HornetQClient#DEFAULT_CALL_TIMEOUT}. * * @return the blocking calls timeout @@ -174,8 +178,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater or equals to 0 * * @param callTimeout blocking call timeout in milliseconds + * @return this ServerLocator */ - void setCallTimeout(long callTimeout); + ServerLocator setCallTimeout(long callTimeout); /** @@ -197,8 +202,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater or equals to -1, -1 means forever * * @param callFailoverTimeout blocking call timeout in milliseconds + * @return this ServerLocator */ - void setCallFailoverTimeout(long callFailoverTimeout); + ServerLocator setCallFailoverTimeout(long callFailoverTimeout); /** * Returns the large message size threshold. @@ -216,8 +222,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater than 0. * * @param minLargeMessageSize large message size threshold in bytes + * @return this ServerLocator */ - void setMinLargeMessageSize(int minLargeMessageSize); + ServerLocator setMinLargeMessageSize(int minLargeMessageSize); /** * Returns the window size for flow control of the consumers created through this factory. @@ -235,8 +242,9 @@ public interface ServerLocator extends AutoCloseable * (to set the maximum size of the buffer) * * @param consumerWindowSize window size (in bytes) used for consumer flow control + * @return this ServerLocator */ - void setConsumerWindowSize(int consumerWindowSize); + ServerLocator setConsumerWindowSize(int consumerWindowSize); /** * Returns the maximum rate of message consumption for consumers created through this factory. @@ -256,8 +264,9 @@ public interface ServerLocator extends AutoCloseable * Value must -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second. * * @param consumerMaxRate maximum rate of message consumption (in messages per seconds) + * @return this ServerLocator */ - void setConsumerMaxRate(int consumerMaxRate); + ServerLocator setConsumerMaxRate(int consumerMaxRate); /** * Returns the size for the confirmation window of clients using this factory. @@ -275,8 +284,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to disable the window) or greater than 0. * * @param confirmationWindowSize size of the confirmation window (in bytes) + * @return this ServerLocator */ - void setConfirmationWindowSize(int confirmationWindowSize); + ServerLocator setConfirmationWindowSize(int confirmationWindowSize); /** * Returns the window size for flow control of the producers created through this factory. @@ -294,8 +304,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to disable flow control) or greater than 0. * * @param producerWindowSize window size (in bytest) for flow control of the producers created through this factory. + * @return this ServerLocator */ - void setProducerWindowSize(int producerWindowSize); + ServerLocator setProducerWindowSize(int producerWindowSize); /** * Returns the maximum rate of message production for producers created through this factory. @@ -315,8 +326,9 @@ public interface ServerLocator extends AutoCloseable * Value must -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second. * * @param producerMaxRate maximum rate of message production (in messages per seconds) + * @return this ServerLocator */ - void setProducerMaxRate(int producerMaxRate); + ServerLocator setProducerMaxRate(int producerMaxRate); /** * Returns whether consumers created through this factory will block while @@ -336,8 +348,9 @@ public interface ServerLocator extends AutoCloseable * @param blockOnAcknowledge <code>true</code> to block when sending message * acknowledgments or <code>false</code> to send them * asynchronously + * @return this ServerLocator */ - void setBlockOnAcknowledge(boolean blockOnAcknowledge); + ServerLocator setBlockOnAcknowledge(boolean blockOnAcknowledge); /** * Returns whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously. @@ -355,8 +368,9 @@ public interface ServerLocator extends AutoCloseable * Sets whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously. * * @param blockOnDurableSend <code>true</code> to block when sending durable messages or <code>false</code> to send them asynchronously + * @return this ServerLocator */ - void setBlockOnDurableSend(boolean blockOnDurableSend); + ServerLocator setBlockOnDurableSend(boolean blockOnDurableSend); /** * Returns whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously. @@ -374,8 +388,9 @@ public interface ServerLocator extends AutoCloseable * Sets whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously. * * @param blockOnNonDurableSend <code>true</code> to block when sending non-durable messages or <code>false</code> to send them asynchronously + * @return this ServerLocator */ - void setBlockOnNonDurableSend(boolean blockOnNonDurableSend); + ServerLocator setBlockOnNonDurableSend(boolean blockOnNonDurableSend); /** * Returns whether producers created through this factory will automatically @@ -394,8 +409,9 @@ public interface ServerLocator extends AutoCloseable * assign a group ID to the messages they sent. * * @param autoGroup <code>true</code> to automatically assign a group ID to each messages sent through this factory, <code>false</code> else + * @return this ServerLocator */ - void setAutoGroup(boolean autoGroup); + ServerLocator setAutoGroup(boolean autoGroup); /** * Returns the group ID that will be eventually set on each message for the property {@link org.hornetq.api.core.Message#HDR_GROUP_ID}. @@ -410,8 +426,9 @@ public interface ServerLocator extends AutoCloseable * Sets the group ID that will be set on each message sent through this factory. * * @param groupID the group ID to use + * @return this ServerLocator */ - void setGroupID(String groupID); + ServerLocator setGroupID(String groupID); /** * Returns whether messages will pre-acknowledged on the server before they are sent to the consumers or not. @@ -427,8 +444,9 @@ public interface ServerLocator extends AutoCloseable * * @param preAcknowledge <code>true</code> to enable pre-acknowledgment, * <code>false</code> else + * @return this ServerLocator */ - void setPreAcknowledge(boolean preAcknowledge); + ServerLocator setPreAcknowledge(boolean preAcknowledge); /** * Returns the acknowledgments batch size. @@ -445,8 +463,9 @@ public interface ServerLocator extends AutoCloseable * Value must be equal or greater than 0. * * @param ackBatchSize acknowledgments batch size + * @return this ServerLocator */ - void setAckBatchSize(int ackBatchSize); + ServerLocator setAckBatchSize(int ackBatchSize); /** * Returns an array of TransportConfigurations representing the static list of live servers used @@ -476,8 +495,9 @@ public interface ServerLocator extends AutoCloseable * or its own pools. * * @param useGlobalPools <code>true</code> to let this factory uses global thread pools, <code>false</code> else + * @return this ServerLocator */ - void setUseGlobalPools(boolean useGlobalPools); + ServerLocator setUseGlobalPools(boolean useGlobalPools); /** * Returns the maximum size of the scheduled thread pool. @@ -495,8 +515,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater than 0. * * @param scheduledThreadPoolMaxSize maximum size of the scheduled thread pool. + * @return this ServerLocator */ - void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize); + ServerLocator setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize); /** * Returns the maximum size of the thread pool. @@ -514,8 +535,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (for unlimited thread pool) or greater than 0. * * @param threadPoolMaxSize maximum size of the thread pool. + * @return this ServerLocator */ - void setThreadPoolMaxSize(int threadPoolMaxSize); + ServerLocator setThreadPoolMaxSize(int threadPoolMaxSize); /** * Returns the time to retry connections created by this factory after failure. @@ -532,8 +554,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater than 0. * * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure + * @return this ServerLocator */ - void setRetryInterval(long retryInterval); + ServerLocator setRetryInterval(long retryInterval); /** * Returns the multiplier to apply to successive retry intervals. @@ -550,8 +573,9 @@ public interface ServerLocator extends AutoCloseable * Value must be positive. * * @param retryIntervalMultiplier multiplier to apply to successive retry intervals + * @return this ServerLocator */ - void setRetryIntervalMultiplier(double retryIntervalMultiplier); + ServerLocator setRetryIntervalMultiplier(double retryIntervalMultiplier); /** * Returns the maximum retry interval (in the case a retry interval multiplier has been specified). @@ -569,8 +593,9 @@ public interface ServerLocator extends AutoCloseable * * @param maxRetryInterval maximum retry interval to apply in the case a retry interval multiplier * has been specified + * @return this ServerLocator */ - void setMaxRetryInterval(long maxRetryInterval); + ServerLocator setMaxRetryInterval(long maxRetryInterval); /** * Returns the maximum number of attempts to retry connection in case of failure. @@ -587,8 +612,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0. * * @param reconnectAttempts maximum number of attempts to retry connection in case of failure + * @return this ServerLocator */ - void setReconnectAttempts(int reconnectAttempts); + ServerLocator setReconnectAttempts(int reconnectAttempts); /** * Sets the maximum number of attempts to establish an initial connection. @@ -596,8 +622,9 @@ public interface ServerLocator extends AutoCloseable * Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0. * * @param reconnectAttempts maximum number of attempts for the initial connection + * @return this ServerLocator */ - void setInitialConnectAttempts(int reconnectAttempts); + ServerLocator setInitialConnectAttempts(int reconnectAttempts); /** * @return the number of attempts to be made for first initial connection. @@ -616,8 +643,9 @@ public interface ServerLocator extends AutoCloseable * Sets the value for FailoverOnInitialReconnection * * @param failover + * @return this ServerLocator */ - void setFailoverOnInitialConnection(boolean failover); + ServerLocator setFailoverOnInitialConnection(boolean failover); /** * Returns the class name of the connection load balancing policy. @@ -631,11 +659,12 @@ public interface ServerLocator extends AutoCloseable /** * Sets the class name of the connection load balancing policy. * <p> - * Value must be the name of a class implementing {@link ConnectionLoadBalancingPolicy}. + * Value must be the name of a class implementing {@link org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy}. * * @param loadBalancingPolicyClassName class name of the connection load balancing policy + * @return this ServerLocator */ - void setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName); + ServerLocator setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName); /** * Returns the initial size of messages created through this factory. @@ -652,8 +681,9 @@ public interface ServerLocator extends AutoCloseable * Value must be greater than 0. * * @param size initial size of messages created through this factory. + * @return this ServerLocator */ - void setInitialMessagePacketSize(int size); + ServerLocator setInitialMessagePacketSize(int size); /** * Adds an interceptor which will be executed <em>after packets are received from the server</em>. Invoking this @@ -671,15 +701,17 @@ public interface ServerLocator extends AutoCloseable * Adds an interceptor which will be executed <em>after packets are received from the server</em>. * * @param interceptor an Interceptor + * @return this ServerLocator */ - void addIncomingInterceptor(Interceptor interceptor); + ServerLocator addIncomingInterceptor(Interceptor interceptor); /** * Adds an interceptor which will be executed <em>before packets are sent to the server</em>. * * @param interceptor an Interceptor + * @return this ServerLocator */ - void addOutgoingInterceptor(Interceptor interceptor); + ServerLocator addOutgoingInterceptor(Interceptor interceptor); /** * Removes an interceptor. Invoking this method is the same as invoking @@ -741,12 +773,18 @@ public interface ServerLocator extends AutoCloseable * Sets whether to compress or not large messages. * * @param compressLargeMessages + * @return this ServerLocator */ - void setCompressLargeMessage(boolean compressLargeMessages); + ServerLocator setCompressLargeMessage(boolean compressLargeMessages); // XXX No javadocs - void addClusterTopologyListener(ClusterTopologyListener listener); + ServerLocator addClusterTopologyListener(ClusterTopologyListener listener); // XXX No javadocs void removeClusterTopologyListener(ClusterTopologyListener listener); + + ClientProtocolManagerFactory getProtocolManagerFactory(); + + void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java index 13cf953..cd06495 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java @@ -50,6 +50,12 @@ public final class AddressSettingsInfo private final boolean sendToDLAOnNoRoute; + private final long slowConsumerThreshold; + + private final long slowConsumerCheckPeriod; + + private final String slowConsumerPolicy; + // Static -------------------------------------------------------- public static AddressSettingsInfo from(final String jsonString) throws Exception @@ -67,7 +73,10 @@ public final class AddressSettingsInfo object.getString("expiryAddress"), object.getBoolean("lastValueQueue"), object.getLong("redistributionDelay"), - object.getBoolean("sendToDLAOnNoRoute")); + object.getBoolean("sendToDLAOnNoRoute"), + object.getLong("slowConsumerThreshold"), + object.getLong("slowConsumerCheckPeriod"), + object.getString("slowConsumerPolicy")); } // Constructors -------------------------------------------------- @@ -84,7 +93,10 @@ public final class AddressSettingsInfo String expiryAddress, boolean lastValueQueue, long redistributionDelay, - boolean sendToDLAOnNoRoute) + boolean sendToDLAOnNoRoute, + long slowConsumerThreshold, + long slowConsumerCheckPeriod, + String slowConsumerPolicy) { this.addressFullMessagePolicy = addressFullMessagePolicy; this.maxSizeBytes = maxSizeBytes; @@ -99,6 +111,9 @@ public final class AddressSettingsInfo this.lastValueQueue = lastValueQueue; this.redistributionDelay = redistributionDelay; this.sendToDLAOnNoRoute = sendToDLAOnNoRoute; + this.slowConsumerThreshold = slowConsumerThreshold; + this.slowConsumerCheckPeriod = slowConsumerCheckPeriod; + this.slowConsumerPolicy = slowConsumerPolicy; } // Public -------------------------------------------------------- @@ -172,5 +187,20 @@ public final class AddressSettingsInfo { return maxRedeliveryDelay; } + + public long getSlowConsumerThreshold() + { + return slowConsumerThreshold; + } + + public long getSlowConsumerCheckPeriod() + { + return slowConsumerCheckPeriod; + } + + public String getSlowConsumerPolicy() + { + return slowConsumerPolicy; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java new file mode 100644 index 0000000..d792a9a --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.api.core.management; + +/** + * This enum defines all core notification types + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public enum CoreNotificationType implements NotificationType +{ + BINDING_ADDED(0), + BINDING_REMOVED(1), + CONSUMER_CREATED(2), + CONSUMER_CLOSED(3), + SECURITY_AUTHENTICATION_VIOLATION(6), + SECURITY_PERMISSION_VIOLATION(7), + DISCOVERY_GROUP_STARTED(8), + DISCOVERY_GROUP_STOPPED(9), + BROADCAST_GROUP_STARTED(10), + BROADCAST_GROUP_STOPPED(11), + BRIDGE_STARTED(12), + BRIDGE_STOPPED(13), + CLUSTER_CONNECTION_STARTED(14), + CLUSTER_CONNECTION_STOPPED(15), + ACCEPTOR_STARTED(16), + ACCEPTOR_STOPPED(17), + PROPOSAL(18), + PROPOSAL_RESPONSE(19), + UNPROPOSAL(20), + CONSUMER_SLOW(21); + + private final int value; + + private CoreNotificationType(final int value) + { + this.value = value; + } + + public int getType() + { + return value; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java index 0d7deca..ca566f7 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java @@ -38,7 +38,7 @@ public interface HornetQServerControl * Returns the list of interceptors used by this server. Invoking this method is the same as invoking * <code>getIncomingInterceptorClassNames().</code> * - * @see Interceptor + * @see org.hornetq.api.core.Interceptor * @deprecated As of HornetQ 2.3.0.Final, replaced by * {@link #getIncomingInterceptorClassNames()} and * {@link #getOutgoingInterceptorClassNames()} @@ -49,14 +49,14 @@ public interface HornetQServerControl /** * Returns the list of interceptors used by this server for incoming messages. * - * @see Interceptor + * @see org.hornetq.api.core.Interceptor */ String[] getIncomingInterceptorClassNames(); /** * Returns the list of interceptors used by this server for outgoing messages. * - * @see Interceptor + * @see org.hornetq.api.core.Interceptor */ String[] getOutgoingInterceptorClassNames(); @@ -490,6 +490,18 @@ public interface HornetQServerControl boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception; /** + * Closes all the connections of clients connected to this server which matches the specified IP address. + */ + @Operation(desc = "Closes all the consumer connections for the given HornetQ address", impact = MBeanOperationInfo.INFO) + boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a HornetQ address", name = "address") String address) throws Exception; + + /** + * Closes all the connections of sessions with a matching user name. + */ + @Operation(desc = "Closes all the connections for sessions with the given user name", impact = MBeanOperationInfo.INFO) + boolean closeConnectionsForUser(@Parameter(desc = "a user name", name = "userName") String address) throws Exception; + + /** * Lists all the IDs of the connections connected to this server. */ @Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO) @@ -546,7 +558,10 @@ public interface HornetQServerControl @Parameter(desc = "the maximum redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay, @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay, @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute, - @Parameter(desc = "the ploicy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy) throws Exception; + @Parameter(desc = "the policy to use when the address is full", name = "addressFullMessagePolicy") String addressFullMessagePolicy, + @Parameter(desc = "when a consumer falls below this threshold in terms of messages consumed per second it will be considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold, + @Parameter(desc = "how often (in seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long slowConsumerCheckPeriod, + @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) throws Exception; void removeAddressSettings(String addressMatch) throws Exception; @@ -599,5 +614,8 @@ public interface HornetQServerControl void forceFailover() throws Exception; void updateDuplicateIdCache(String address, Object[] ids) throws Exception; + + @Operation(desc = "force the server to stop and to scale down to another server", impact = MBeanOperationInfo.UNKNOWN) + void scaleDown(@Parameter(name = "name", desc = "The connector to use to scale down, if not provided the first appropriate connector will be used")String connector) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java index ffda2df..bacae4d 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java @@ -75,6 +75,10 @@ public final class ManagementHelper public static final SimpleString HDR_PROPOSAL_ALT_VALUE = new SimpleString("_JBM_ProposalAltValue"); + public static final SimpleString HDR_CONSUMER_NAME = new SimpleString("_HQ_ConsumerName"); + + public static final SimpleString HDR_CONNECTION_NAME = new SimpleString("_HQ_ConnectionName"); + // Attributes ---------------------------------------------------- // Static -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java index 2ec5dd3..ecabae7 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java @@ -24,37 +24,7 @@ package org.hornetq.api.core.management; * @see the HornetQ user manual section on "Management Notifications" * @author <a href="mailto:[email protected]">Jeff Mesnil</a> */ -public enum NotificationType +public interface NotificationType { - BINDING_ADDED(0), - BINDING_REMOVED(1), - CONSUMER_CREATED(2), - CONSUMER_CLOSED(3), - SECURITY_AUTHENTICATION_VIOLATION(6), - SECURITY_PERMISSION_VIOLATION(7), - DISCOVERY_GROUP_STARTED(8), - DISCOVERY_GROUP_STOPPED(9), - BROADCAST_GROUP_STARTED(10), - BROADCAST_GROUP_STOPPED(11), - BRIDGE_STARTED(12), - BRIDGE_STOPPED(13), - CLUSTER_CONNECTION_STARTED(14), - CLUSTER_CONNECTION_STOPPED(15), - ACCEPTOR_STARTED(16), - ACCEPTOR_STOPPED(17), - PROPOSAL(18), - PROPOSAL_RESPONSE(19), - UNPROPOSAL(20); - - private final int value; - - private NotificationType(final int value) - { - this.value = value; - } - - public int intValue() - { - return value; - } + int getType(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java index 0c43dad..4994902 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java @@ -23,6 +23,7 @@ import org.hornetq.api.core.SimpleString; */ public final class ObjectNameBuilder { + // Constants ----------------------------------------------------- /** @@ -30,14 +31,13 @@ public final class ObjectNameBuilder */ public static final ObjectNameBuilder DEFAULT = new ObjectNameBuilder(HornetQDefaultConfiguration.getDefaultJmxDomain()); - static final String DEFAULT_SUBSYSTEM = "messaging"; - static final String DEFAULT_SERVER = "default"; + static final String JMS_MODULE = "JMS"; + + static final String CORE_MODULE = "Core"; // Attributes ---------------------------------------------------- private final String domain; - private final String subsystem; - private final String serverName; // Static -------------------------------------------------------- @@ -58,8 +58,6 @@ public final class ObjectNameBuilder private ObjectNameBuilder(final String domain) { this.domain = domain; - this.subsystem = DEFAULT_SUBSYSTEM; - this.serverName = DEFAULT_SERVER; } // Public -------------------------------------------------------- @@ -69,9 +67,7 @@ public final class ObjectNameBuilder */ public ObjectName getHornetQServerObjectName() throws Exception { - String value = String.format("%s:subsystem=%s,hornetq-server=%s", - domain, subsystem, serverName); - return ObjectName.getInstance(value); + return ObjectName.getInstance(domain + ":module=Core,type=Server"); } /** @@ -81,7 +77,7 @@ public final class ObjectNameBuilder */ public ObjectName getAddressObjectName(final SimpleString address) throws Exception { - return createObjectName("core-address", address.toString()); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "Address", address.toString()); } /** @@ -91,11 +87,12 @@ public final class ObjectNameBuilder */ public ObjectName getQueueObjectName(final SimpleString address, final SimpleString name) throws Exception { - ObjectProperty[] props = new ObjectProperty[] { - new ObjectProperty("address", address.toString()), - new ObjectProperty("runtime-queue", name.toString()) - }; - return this.createObjectName(props); + return ObjectName.getInstance(String.format("%s:module=%s,type=%s,address=%s,name=%s", + domain, + ObjectNameBuilder.CORE_MODULE, + "Queue", + ObjectName.quote(address.toString()), + ObjectName.quote(name.toString()))); } /** @@ -105,7 +102,7 @@ public final class ObjectNameBuilder */ public ObjectName getDivertObjectName(final String name) throws Exception { - return createObjectName("divert", name.toString()); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "Divert", name.toString()); } /** @@ -115,7 +112,7 @@ public final class ObjectNameBuilder */ public ObjectName getAcceptorObjectName(final String name) throws Exception { - return createObjectName("remote-acceptor", name); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "Acceptor", name); } /** @@ -125,7 +122,7 @@ public final class ObjectNameBuilder */ public ObjectName getBroadcastGroupObjectName(final String name) throws Exception { - return createObjectName("broadcast-group", name); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "BroadcastGroup", name); } /** @@ -135,7 +132,7 @@ public final class ObjectNameBuilder */ public ObjectName getBridgeObjectName(final String name) throws Exception { - return createObjectName("bridge", name); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "Bridge", name); } /** @@ -145,7 +142,7 @@ public final class ObjectNameBuilder */ public ObjectName getClusterConnectionObjectName(final String name) throws Exception { - return createObjectName("cluster-connection", name); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "ClusterConnection", name); } /** @@ -155,7 +152,7 @@ public final class ObjectNameBuilder */ public ObjectName getDiscoveryGroupObjectName(final String name) throws Exception { - return createObjectName("discovery-group", name); + return createObjectName(ObjectNameBuilder.CORE_MODULE, "DiscoveryGroup", name); } /** @@ -164,9 +161,7 @@ public final class ObjectNameBuilder */ public ObjectName getJMSServerObjectName() throws Exception { - String value = String.format("%s:subsystem=%s,hornetq-server=%s", - domain, subsystem, serverName); - return ObjectName.getInstance(value); + return ObjectName.getInstance(domain + ":module=JMS,type=Server"); } /** @@ -175,7 +170,7 @@ public final class ObjectNameBuilder */ public ObjectName getJMSQueueObjectName(final String name) throws Exception { - return createObjectName("jms-queue", name); + return createObjectName(ObjectNameBuilder.JMS_MODULE, "Queue", name); } /** @@ -185,7 +180,7 @@ public final class ObjectNameBuilder */ public ObjectName getJMSTopicObjectName(final String name) throws Exception { - return createObjectName("jms-topic", name); + return createObjectName(ObjectNameBuilder.JMS_MODULE, "Topic", name); } /** @@ -194,38 +189,15 @@ public final class ObjectNameBuilder */ public ObjectName getConnectionFactoryObjectName(final String name) throws Exception { - return createObjectName("connection-factory", name); + return createObjectName(ObjectNameBuilder.JMS_MODULE, "ConnectionFactory", name); } - private ObjectName createObjectName(String beanType, String beanName) throws Exception + private ObjectName createObjectName(final String module, final String type, final String name) throws Exception { - return this.createObjectName(new ObjectProperty[] {new ObjectProperty(beanType, beanName)}); - } - - private ObjectName createObjectName(ObjectProperty[] props) throws Exception - { - String baseValue = String.format("%s:subsystem=%s,hornetq-server=%s", - domain, subsystem, serverName); - StringBuilder builder = new StringBuilder(baseValue); - for (ObjectProperty p : props) - { - builder.append(","); - builder.append(p.key); - builder.append("="); - builder.append(ObjectName.quote(p.val)); - } - return ObjectName.getInstance(builder.toString()); - } - - private static class ObjectProperty - { - String key; - String val; - - public ObjectProperty(String k, String v) - { - this.key = k; - this.val = v; - } + return ObjectName.getInstance(String.format("%s:module=%s,type=%s,name=%s", + domain, + module, + type, + ObjectName.quote(name))); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java index f2c6895..d63ea15 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java @@ -81,6 +81,11 @@ public interface QueueControl long getMessagesAdded(); /** + * Returns the number of messages added to this queue since it was created. + */ + long getMessagesAcknowledged(); + + /** * Returns the first message on the queue as JSON */ String getFirstMessageAsJSON() throws Exception; @@ -356,4 +361,19 @@ public interface QueueControl */ @Operation(desc = "Resets the MessagesAdded property", impact = MBeanOperationInfo.ACTION) void resetMessagesAdded() throws Exception; + + /** + * Resets the MessagesAdded property + */ + @Operation(desc = "Resets the MessagesAcknowledged property", impact = MBeanOperationInfo.ACTION) + void resetMessagesAcknowledged() throws Exception; + + /** + * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call + * any other measure. + * It is useful if you need the exact number of counts on a message + * @throws Exception + */ + void flushExecutor(); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java b/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java index a279096..9eae62e 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java @@ -28,7 +28,17 @@ public final class ResetLimitWrappedHornetQBuffer extends ChannelBufferWrapper { private final int limit; - private final MessageInternal message; + private MessageInternal message; + + /** + * We need to turn of notifications of body changes on reset on the server side when dealing with AMQP conversions, + * for that reason this method will set the message to null here + * @param message + */ + public void setMessage(MessageInternal message) + { + this.message = message; + } public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final MessageInternal message) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java index a1f053a..ca990e6 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java @@ -408,4 +408,8 @@ public interface HornetQClientLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 214023, value = "HTTP Handshake failed, the received accept value %s does not match the expected response %s") void httpHandshakeFailed(String response, String expectedResponse); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 214024, value = "HTTP upgrade not supported by remote acceptor") + void httpUpgradeNotSupportedByRemoteAcceptor(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java index 77bada8..e426a63 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java @@ -102,7 +102,7 @@ public interface HornetQClientMessageBundle @Message(id = 119016, value = "Connection failure detected. Unblocking a blocking call that will never get a resp" + "onse", format = Message.Format.MESSAGE_FORMAT) - HornetQUnBlockedException unblockingACall(); + HornetQUnBlockedException unblockingACall(@Cause Throwable t); @Message(id = 119017, value = "Consumer is closed", format = Message.Format.MESSAGE_FORMAT) HornetQObjectClosedException consumerClosed(); @@ -241,4 +241,7 @@ public interface HornetQClientMessageBundle , format = Message.Format.MESSAGE_FORMAT) HornetQLargeMessageInterruptedException largeMessageInterrupted(); + @Message(id = 119061, value = "error decoding AMQP frame", format = Message.Format.MESSAGE_FORMAT) + String decodeError(); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java index ac719a8..7933245 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java @@ -32,6 +32,7 @@ import org.hornetq.api.core.client.MessageHandler; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; +import org.hornetq.spi.core.remoting.ConsumerContext; import org.hornetq.spi.core.remoting.SessionContext; import org.hornetq.utils.FutureLatch; import org.hornetq.utils.PriorityLinkedList; @@ -67,7 +68,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal private final SessionContext sessionContext; - private final long id; + private final ConsumerContext consumerContext; private final SimpleString filterString; @@ -137,7 +138,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal // --------------------------------------------------------------------------------- public ClientConsumerImpl(final ClientSessionInternal session, - final long id, + final ConsumerContext consumerContext, final SimpleString queueName, final SimpleString filterString, final boolean browseOnly, @@ -150,7 +151,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal final ClientSession.QueueQuery queueInfo, final ClassLoader contextClassLoader) { - this.id = id; + this.consumerContext = consumerContext; this.queueName = queueName; @@ -180,9 +181,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal // ClientConsumer implementation // ----------------------------------------------------------------- - public Object getId() + public ConsumerContext getConsumerContext() { - return id; + return consumerContext; } private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException @@ -421,7 +422,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal // Must be synchronized since messages may be arriving while handler is being set and might otherwise end // up not queueing enough executors - so messages get stranded - public synchronized void setMessageHandler(final MessageHandler theHandler) throws HornetQException + public synchronized ClientConsumerImpl setMessageHandler(final MessageHandler theHandler) throws HornetQException { checkClosed(); @@ -449,6 +450,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { waitForOnMessageToComplete(true); } + + return this; } public void close() throws HornetQException @@ -457,29 +460,28 @@ public final class ClientConsumerImpl implements ClientConsumerInternal } /** - * To be used by MDBs + * To be used by MDBs to stop any more handling of messages. * * @throws HornetQException + * @param future the future to run once the onMessage Thread has completed */ - public void interruptHandlers() throws HornetQException + public Thread prepareForClose(final FutureLatch future) throws HornetQException { closing = true; resetLargeMessageController(); - Thread onThread = onMessageThread; - if (onThread != null) + //execute the future after the last onMessage call + sessionExecutor.execute(new Runnable() { - try - { - // just trying to interrupt any ongoing messages - onThread.interrupt(); - } - catch (Throwable ignored) + @Override + public void run() { - // security exception probably.. we just ignore it, not big deal! + future.run(); } - } + }); + + return onMessageThread; } public void cleanUp() @@ -558,11 +560,6 @@ public final class ClientConsumerImpl implements ClientConsumerInternal return queueInfo; } - public long getID() - { - return id; - } - public SimpleString getFilterString() { return filterString; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java index c0b4c1e..5b198f5 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java @@ -17,6 +17,7 @@ import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; +import org.hornetq.utils.FutureLatch; /** * A ClientConsumerInternal @@ -25,8 +26,6 @@ import org.hornetq.api.core.client.ClientSession; */ public interface ClientConsumerInternal extends ClientConsumer { - long getID(); - SimpleString getQueueName(); SimpleString getFilterString(); @@ -47,8 +46,9 @@ public interface ClientConsumerInternal extends ClientConsumer * To be called by things like MDBs during shutdown of the server * * @throws HornetQException + * @param future */ - void interruptHandlers() throws HornetQException; + Thread prepareForClose(FutureLatch future) throws HornetQException; void clearAtFailover(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java index 65adfbc..09a7503 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java @@ -131,7 +131,7 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C } @Override - public void setOutputStream(final OutputStream out) throws HornetQException + public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws HornetQException { if (bodyBuffer != null) { @@ -141,6 +141,8 @@ public final class ClientLargeMessageImpl extends ClientMessageImpl implements C { largeMessageController.setOutputStream(out); } + + return this; } @Override
