QPIDJMS-267 Failover discovery via connection properties Add support for discovering new failover URIs bsaed on redirect style Maps in a list provided in the Open frame of a remote peer. Allow for addition, replacement or ignore of those discovered hosts by the failover transport and provide an event point for connection listeners to see the discovered resources.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6295f7e6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6295f7e6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6295f7e6 Branch: refs/heads/master Commit: 6295f7e6b8fd3ef6ddf4e2cb28d81fd493956282 Parents: 1b8f246 Author: Timothy Bish <[email protected]> Authored: Fri Feb 24 17:21:03 2017 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Feb 24 17:27:11 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 21 + .../apache/qpid/jms/JmsConnectionListener.java | 9 + .../jms/provider/DefaultProviderListener.java | 5 + .../qpid/jms/provider/ProviderFactory.java | 26 +- .../qpid/jms/provider/ProviderListener.java | 12 + .../provider/ProviderRedirectedException.java | 49 +- .../qpid/jms/provider/ProviderWrapper.java | 6 + .../jms/provider/amqp/AmqpAbstractResource.java | 2 +- .../qpid/jms/provider/amqp/AmqpConnection.java | 2 +- .../provider/amqp/AmqpConnectionProperties.java | 35 +- .../jms/provider/amqp/AmqpFixedProducer.java | 2 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 52 +- .../jms/provider/amqp/AmqpProviderFactory.java | 41 +- .../qpid/jms/provider/amqp/AmqpRedirect.java | 217 ++++ .../qpid/jms/provider/amqp/AmqpSupport.java | 43 +- .../amqp/AmqpTransactionCoordinator.java | 3 +- .../amqp/builders/AmqpConnectionBuilder.java | 21 + .../amqp/builders/AmqpResourceBuilder.java | 2 +- .../jms/provider/failover/FailoverProvider.java | 79 +- .../jms/provider/failover/FailoverUriPool.java | 57 +- .../qpid/jms/transports/TransportFactory.java | 7 + .../netty/NettySslTransportFactory.java | 5 + .../netty/NettyWssTransportFactory.java | 5 + .../org/apache/qpid/jms/util/PropertyUtil.java | 9 +- .../org/apache/qpid/jms/util/URISupport.java | 22 +- .../services/org/apache/qpid/jms/provider/amqp | 3 +- .../services/org/apache/qpid/jms/provider/amqps | 3 +- .../org/apache/qpid/jms/provider/amqpws | 3 +- .../org/apache/qpid/jms/provider/amqpwss | 3 +- .../org/apache/qpid/jms/provider/redirects/ws | 19 + .../org/apache/qpid/jms/provider/redirects/wss | 19 + .../qpid/jms/JmsDefaultConnectionListener.java | 5 + .../integration/ConnectionIntegrationTest.java | 9 +- .../FailedConnectionsIntegrationTest.java | 12 +- .../jms/provider/amqp/AmqpProviderTest.java | 58 +- .../qpid/jms/provider/amqp/AmqpSupportTest.java | 52 +- .../provider/failover/FailoverProviderTest.java | 27 + .../provider/failover/FailoverRedirectTest.java | 37 +- .../provider/failover/FailoverUriPoolTest.java | 40 + ...qpOpenProvidedServerListIntegrationTest.java | 1142 ++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 7 +- .../jms/test/testpeer/TestAmqpPeerRunner.java | 4 + .../netty/NettyTcpToMockServerTest.java | 4 +- .../apache/qpid/jms/util/URISupportTest.java | 19 +- qpid-jms-docs/Configuration.md | 2 + .../JmsConsumerPriorityDispatchTest.java | 9 + .../jms/discovery/FileWatcherDiscoveryTest.java | 5 + .../jms/discovery/JmsAmqpDiscoveryTest.java | 5 + .../transactions/JmsTransactedConsumerTest.java | 4 + 49 files changed, 2019 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 3c0f918..8294570 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms; import java.io.IOException; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -1369,6 +1370,26 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection onAsyncException(cause); } + @Override + public void onRemoteDiscovery(final List<URI> remotes) { + for (URI remote : remotes) { + LOG.trace("Discovered new remote at: {}", remote); + } + + // Give listeners a chance to know what we've discovered. + if (!connectionListeners.isEmpty()) { + for (final JmsConnectionListener listener : connectionListeners) { + executor.submit(new Runnable() { + + @Override + public void run() { + listener.onRemoteDiscovery(remotes); + } + }); + } + } + } + /** * Handles any asynchronous errors that occur from the JMS framework classes. * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java index f9389f4..b25a2b1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms; import java.net.URI; +import java.util.List; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -104,4 +105,12 @@ public interface JmsConnectionListener { */ void onProducerClosed(MessageProducer producer, Throwable cause); + /** + * Called when additional remote peers are discovered by this connection. + * + * @param remotes + * A list of remote peers that have been discovered. + */ + void onRemoteDiscovery(List<URI> remotes); + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java index 1d98e01..6fe2334 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider; import java.io.IOException; import java.net.URI; +import java.util.List; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; @@ -71,4 +72,8 @@ public class DefaultProviderListener implements ProviderListener { @Override public void onProviderException(Exception cause) { } + + @Override + public void onRemoteDiscovery(List<URI> remotes) { + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java index c18b336..19a41db 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java @@ -48,7 +48,7 @@ public abstract class ProviderFactory { public abstract Provider createProvider(URI remoteURI) throws Exception; /** - * @return the name of this JMS Provider. + * @return the name of this Provider. */ public abstract String getName(); @@ -92,8 +92,28 @@ public abstract class ProviderFactory { * @throws IOException if an error occurs while locating the factory. */ public static ProviderFactory findProviderFactory(URI location) throws IOException { - String scheme = location.getScheme(); - if (scheme == null) { + if (location == null) { + throw new IOException("No Provider location specified."); + } + + return findProviderFactory(location.getScheme()); + } + + /** + * Searches for a ProviderFactory by using the scheme given. + * + * The search first checks the local cache of provider factories before moving on + * to search in the class path. + * + * @param scheme + * The URI scheme that describes the desired factory. + * + * @return a provider factory instance matching the scheme. + * + * @throws IOException if an error occurs while locating the factory. + */ + public static ProviderFactory findProviderFactory(String scheme) throws IOException { + if (scheme == null || scheme.isEmpty()) { throw new IOException("No Provider scheme specified."); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java index a96de4f..412242e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider; import java.io.IOException; import java.net.URI; +import java.util.List; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; @@ -155,4 +156,15 @@ public interface ProviderListener { */ void onProviderException(Exception cause); + /** + * Called when additional remote peers are discovered. + * <p> + * If new peers are discovered their URIs are provided to listeners to allow for + * failover or update of client connection information. + * + * @param remotes + * A list of remote peers that have been discovered. + */ + void onRemoteDiscovery(List<URI> remotes); + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java index ff997a8..3ab6459 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider; import java.io.IOException; +import java.net.URI; /** * {@link IOException} derivative that defines that the remote peer has requested that this @@ -26,54 +27,18 @@ public class ProviderRedirectedException extends IOException { private static final long serialVersionUID = 5872211116061710369L; - private final String hostname; - private final String networkHost; - private final String scheme; - private final String path; - private final int port; + private final URI redirect; - public ProviderRedirectedException(String reason, String scheme, String hostname, String networkHost, int port, String path) { + public ProviderRedirectedException(String reason, URI redirect) { super(reason); - this.scheme = scheme; - this.hostname = hostname; - this.networkHost = networkHost; - this.port = port; - this.path = path; + this.redirect = redirect; } /** - * @return the host name of the container being redirected to. + * @return the URI that represents the redirection. */ - public String getHostname() { - return hostname; - } - - /** - * @return the DNS host name or IP address of the peer this connection is being redirected to. - */ - public String getNetworkHost() { - return networkHost; - } - - /** - * @return the port number on the peer this connection is being redirected to. - */ - public int getPort() { - return port; - } - - /** - * @return the scheme that the remote indicated the redirect connection should use. - */ - public String getScheme() { - return scheme; - } - - /** - * @return the path that the remote indicated should be path of the redirect URI. - */ - public String getPath() { - return path; + public URI getRedirectionURI() { + return redirect; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java index 74bfa16..2e3efc8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider; import java.io.IOException; import java.net.URI; +import java.util.List; import javax.jms.JMSException; @@ -204,6 +205,11 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi listener.onProviderException(cause); } + @Override + public void onRemoteDiscovery(List<URI> remotes) { + listener.onRemoteDiscovery(remotes); + } + /** * @return the wrapped Provider. */ http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 3a8cf3d..11a14a0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -259,7 +259,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp if (isAwaitingClose()) { closeResource(provider, null, true); // Close was expected so ignore any endpoint errors. } else { - closeResource(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()), true); + closeResource(provider, AmqpSupport.convertToException(provider, getEndpoint(), getEndpoint().getRemoteCondition()), true); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 48271bf..c93107b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -66,7 +66,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn this.amqpMessageFactory = new AmqpJmsMessageFactory(this); // Create connection properties initialized with defaults from the JmsConnectionInfo - this.properties = new AmqpConnectionProperties(info); + this.properties = new AmqpConnectionProperties(info, provider); } public void createSession(JmsSessionInfo sessionInfo, AsyncResult request) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java index 815104a..99f0063 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -19,10 +19,12 @@ package org.apache.qpid.jms.provider.amqp; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.FAILOVER_SERVER_LIST; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -42,11 +44,13 @@ public class AmqpConnectionProperties { private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectionProperties.class); private final JmsConnectionInfo connectionInfo; + private final AmqpProvider provider; private boolean delayedDeliverySupported = false; private boolean anonymousRelaySupported = false; private boolean sharedSubsSupported = false; private boolean connectionOpenFailed = false; + private final List<AmqpRedirect> failoverServerList = new ArrayList<>(); /** * Creates a new instance of this class with default values read from the @@ -54,9 +58,12 @@ public class AmqpConnectionProperties { * * @param connectionInfo * the JmsConnectionInfo object used to populate defaults. + * @param provider + * the provider instance associated with this object */ - public AmqpConnectionProperties(JmsConnectionInfo connectionInfo) { + public AmqpConnectionProperties(JmsConnectionInfo connectionInfo, AmqpProvider provider) { this.connectionInfo = connectionInfo; + this.provider = provider; } /** @@ -92,6 +99,7 @@ public class AmqpConnectionProperties { } } + @SuppressWarnings("unchecked") protected void processProperties(Map<Symbol, Object> properties) { if (properties.containsKey(QUEUE_PREFIX)) { Object o = properties.get(QUEUE_PREFIX); @@ -113,6 +121,31 @@ public class AmqpConnectionProperties { LOG.trace("Remote sent Connection Establishment Failed marker."); connectionOpenFailed = true; } + + if (properties.containsKey(FAILOVER_SERVER_LIST)) { + LOG.trace("Remote sent Failover Server List."); + Object o = properties.get(FAILOVER_SERVER_LIST); + if (o instanceof List) { + for (Map<Symbol, Object> redirection : (List<Map<Symbol, Object>>) o) { + try { + failoverServerList.add(new AmqpRedirect(redirection, provider).validate()); + } catch (Exception ex) { + LOG.debug("Invalid redirection value given in failover server list: {}", ex.getMessage()); + } + } + + LOG.trace("Failover Server List: {}", failoverServerList); + } + } + } + + /** + * Get any advertised failover server list details. + * + * @return return the advertised failover server list details, list is empty if no server list given. + */ + public List<AmqpRedirect> getFailoverServerList() { + return failoverServerList; } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 9b5bc71..4acd419 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -237,7 +237,7 @@ public class AmqpFixedProducer extends AmqpProducer { remoteError = getEndpoint().getRemoteCondition(); } - deliveryError = AmqpSupport.convertToException(getEndpoint(), remoteError); + deliveryError = AmqpSupport.convertToException(getParent().getProvider(), getEndpoint(), remoteError); } else if (outcome instanceof Released) { LOG.trace("Outcome of delivery was released: {}", delivery); deliveryError = new JMSException("Delivery failed: released by receiver"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 1c54ca6..d5d5ade 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -55,7 +56,7 @@ import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder; import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder; -import org.apache.qpid.jms.transports.TransportFactory; +import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.jms.util.ThreadPoolUtils; @@ -66,7 +67,6 @@ import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; @@ -106,8 +106,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private volatile ProviderListener listener; private AmqpConnection connection; private AmqpSaslAuthenticator authenticator; - private volatile org.apache.qpid.jms.transports.Transport transport; - private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE; + private final Transport transport; private String vhost; private boolean traceFrames; private boolean traceBytes; @@ -117,13 +116,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private int channelMax = DEFAULT_CHANNEL_MAX; private int idleTimeout = 60000; private int drainTimeout = 60000; - private long sessionOutoingWindow = -1; //Use proton default + private long sessionOutoingWindow = -1; // Use proton default private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + private boolean allowNonSecureRedirects; + private final URI remoteURI; private final AtomicBoolean closed = new AtomicBoolean(); private ScheduledThreadPoolExecutor serializer; - private final Transport protonTransport = Transport.Factory.create(); + private final org.apache.qpid.proton.engine.Transport protonTransport = + org.apache.qpid.proton.engine.Transport.Factory.create(); private final Collector protonCollector = new CollectorImpl(); private final Connection protonConnection = Connection.Factory.create(); @@ -135,9 +137,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP * * @param remoteURI * The URI of the AMQP broker this Provider instance will connect to. + * @param transport + * The underlying Transport that will be used for wire level communications. */ - public AmqpProvider(URI remoteURI) { + public AmqpProvider(URI remoteURI, Transport transport) { this.remoteURI = remoteURI; + this.transport = transport; serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @@ -184,7 +189,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP SSLContext sslContextOverride = connectionInfo.getSslContextOverride(); - transport = TransportFactory.create(getTransportType(), getRemoteURI()); transport.setTransportListener(AmqpProvider.this); transport.connect(sslContextOverride); @@ -1023,6 +1027,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } + public void fireRemotesDiscovered(List<URI> remotes) { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onRemoteDiscovery(remotes); + } + } + @Override public void addChildResource(AmqpResource resource) { if (resource instanceof AmqpConnection) { @@ -1166,6 +1177,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP this.sessionOutoingWindow = sessionOutoingWindow; } + public boolean isAllowNonSecureRedirects() { + return allowNonSecureRedirects; + } + + /** + * Should the AMQP connection allow a redirect or failover server update that redirects + * from a secure connection to an non-secure one (SSL to TCP). + * + * @param allowNonSecureRedirects + * the allowNonSecureRedirects value to apply to this AMQP connection. + */ + public void setAllowNonSecureRedirects(boolean allowNonSecureRedirects) { + this.allowNonSecureRedirects = allowNonSecureRedirects; + } + public long getCloseTimeout() { return connectionInfo != null ? connectionInfo.getCloseTimeout() : JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; } @@ -1195,12 +1221,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP this.channelMax = channelMax; } - String getTransportType() { - return transportType; - } - - void setTransportType(String transportType) { - this.transportType = transportType; + public Transport getTransport() { + return transport; } @Override @@ -1218,7 +1240,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return remoteURI; } - public Transport getProtonTransport() { + public org.apache.qpid.proton.engine.Transport getProtonTransport() { return protonTransport; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java index 5f81c6f..5675277 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java @@ -19,8 +19,9 @@ package org.apache.qpid.jms.provider.amqp; import java.net.URI; import java.util.Map; -import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportFactory; import org.apache.qpid.jms.util.PropertyUtil; /** @@ -28,21 +29,22 @@ import org.apache.qpid.jms.util.PropertyUtil; */ public class AmqpProviderFactory extends ProviderFactory { - public static final String DEFAULT_TRANSPORT_TYPE = "tcp"; + public static final String DEFAULT_TRANSPORT_SCHEME = "tcp"; + public static final String DEFAULT_PROVIDER_SCHEME = "amqp"; - private String transportType = DEFAULT_TRANSPORT_TYPE; + private String transportScheme = DEFAULT_TRANSPORT_SCHEME; + private String providerScheme = DEFAULT_PROVIDER_SCHEME; @Override - public Provider createProvider(URI remoteURI) throws Exception { + public AmqpProvider createProvider(URI remoteURI) throws Exception { Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "amqp."); - remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + // Clear off any amqp.X values from the transport before creation. + Transport transport = TransportFactory.create(getTransportScheme(), PropertyUtil.replaceQuery(remoteURI, map)); - AmqpProvider result = new AmqpProvider(remoteURI); - - result.setTransportType(getTransportType()); + AmqpProvider result = new AmqpProvider(remoteURI, transport); Map<String, String> unused = PropertyUtil.setProperties(result, providerOptions); if (!unused.isEmpty()) { @@ -62,18 +64,27 @@ public class AmqpProviderFactory extends ProviderFactory { return "AMQP"; } + public String getTransportScheme() { + return transportScheme; + } + /** - * @return the transport type used for this provider factory such as 'tcp' or 'ssl' + * @param transportScheme + * the transport type name to use when creating a new provider. */ - public String getTransportType() { - return transportType; + public void setTransportScheme(String transportScheme) { + this.transportScheme = transportScheme; + } + + public String getProviderScheme() { + return providerScheme; } /** - * @param transportType - * the transport type name to use when creating a new provider. + * @param providerScheme + * the providerScheme to use to identify the AMQP provider */ - public void setTransportType(String transportType) { - this.transportType = transportType; + public void setProviderScheme(String providerScheme) { + this.providerScheme = providerScheme; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java new file mode 100644 index 0000000..2e2588a --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PATH; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME; + +import java.io.IOException; +import java.net.URI; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.qpid.jms.provider.ProviderFactory; +import org.apache.qpid.jms.transports.TransportFactory; +import org.apache.qpid.jms.util.FactoryFinder; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.jms.util.URISupport; +import org.apache.qpid.proton.amqp.Symbol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the AMQP Redirect Map + */ +public class AmqpRedirect { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpRedirect.class); + + private static final FactoryFinder<ProviderFactory> PROVIDER_FACTORY_FINDER = + new FactoryFinder<ProviderFactory>(ProviderFactory.class, + "META-INF/services/" + ProviderFactory.class.getPackage().getName().replace(".", "/") + "/redirects/"); + + private final Map<Symbol, Object> redirect; + private final AmqpProvider provider; + + public AmqpRedirect(Map<Symbol, Object> redirect, AmqpProvider provider) { + this.redirect = redirect; + this.provider = provider; + + if (provider == null) { + throw new IllegalArgumentException("A provider instance is required"); + } + + URI remoteURI = provider.getRemoteURI(); + if (remoteURI == null || remoteURI.getScheme() == null || remoteURI.getScheme().isEmpty()) { + throw new IllegalArgumentException("The provider instance must provide a valid scheme"); + } + } + + public AmqpRedirect validate() throws Exception { + String networkHost = (String) redirect.get(NETWORK_HOST); + if (networkHost == null || networkHost.isEmpty()) { + throw new IOException("Redirection information not set, missing network host."); + } + + try { + Integer.parseInt(redirect.get(PORT).toString()); + } catch (Exception ex) { + throw new IOException("Redirection information contained invalid port."); + } + + String sourceScheme = provider.getRemoteURI().getScheme(); + String scheme = (String) redirect.get(SCHEME); + if (scheme != null && !scheme.isEmpty() && !scheme.equals(sourceScheme)) { + + // Attempt to located a provider using normal scheme (amqp, amqps, etc...) + ProviderFactory factory = null; + try { + factory = ProviderFactory.findProviderFactory(scheme); + } catch (Throwable error) { + LOG.trace("Couldn't find AMQP prefixed Provider using scheme: {}", scheme); + } + + if (factory == null) { + // Attempt to located a transport level redirect (ws, wss, etc...) + try { + factory = findProviderFactoryByTransportScheme(scheme); + } catch (Throwable error) { + LOG.trace("Couldn't find Provider using transport scheme: {}", scheme); + } + } + + if (factory == null || !(factory instanceof AmqpProviderFactory)) { + throw new IOException("Redirect contained an unknown provider scheme: " + scheme); + } + + LOG.trace("Found provider: {} for redirect: {}", factory.getName(), scheme); + + AmqpProviderFactory amqpFactory = (AmqpProviderFactory) factory; + String transportType = amqpFactory.getTransportScheme(); + + if (transportType == null || transportType.isEmpty()) { + throw new IOException("Redirect contained an unknown provider scheme: " + scheme); + } + + TransportFactory transportFactory = TransportFactory.findTransportFactory(transportType); + if (transportFactory == null) { + throw new IOException("Redirect contained an unknown provider scheme: " + scheme); + } + + // Check for insecure redirect and whether it is allowed. + if (provider.getTransport().isSecure() && !transportFactory.isSecure() && !provider.isAllowNonSecureRedirects()) { + throw new IOException("Attempt to redirect to an insecure connection type: " + transportType); + } + + // Update the redirect information with the resolved target scheme used to create + // the provider for the redirection. + redirect.put(SCHEME, amqpFactory.getProviderScheme()); + } + + // Check it actually converts to URI since we require it do so later + toURI(); + + return this; + } + + /** + * @return the redirection map that backs this object + */ + public Map<Symbol, Object> getRedirectMap() { + return redirect; + } + + /** + * @return the host name of the container being redirected to. + */ + public String getHostname() { + return (String) redirect.get(OPEN_HOSTNAME); + } + + /** + * @return the DNS host name or IP address of the peer this connection is being redirected to. + */ + public String getNetworkHost() { + return (String) redirect.get(NETWORK_HOST); + } + + /** + * @return the port number on the peer this connection is being redirected to. + */ + public int getPort() { + return Integer.parseInt(redirect.get(PORT).toString()); + } + + /** + * @return the scheme that the remote indicated the redirect connection should use. + */ + public String getScheme() { + String scheme = (String) redirect.get(SCHEME); + if (scheme == null || scheme.isEmpty()) { + scheme = provider.getRemoteURI().getScheme(); + } + + return scheme; + } + + /** + * @return the path that the remote indicated should be path of the redirect URI. + */ + public String getPath() { + return (String) redirect.get(PATH); + } + + /** + * Construct a URI from the redirection information available. + * + * @return a URI that matches the redirection information provided. + * + * @throws Exception if an error occurs construct a URI from the redirection information. + */ + public URI toURI() throws Exception { + Map<String, String> queryOptions = PropertyUtil.parseQuery(provider.getRemoteURI()); + + URI result = new URI(getScheme(), null, getNetworkHost(), getPort(), getPath(), null, null); + + String hostname = getHostname(); + if (hostname != null && !hostname.isEmpty()) { + // Ensure we replace any existing vhost option with the redirect version. + queryOptions = new LinkedHashMap<>(queryOptions); + queryOptions.put("amqp.vhost", hostname); + } + + return URISupport.applyParameters(result, queryOptions); + } + + private static ProviderFactory findProviderFactoryByTransportScheme(String scheme) throws IOException { + if (scheme == null || scheme.isEmpty()) { + throw new IOException("No Transport scheme specified."); + } + + ProviderFactory factory = null; + try { + factory = PROVIDER_FACTORY_FINDER.newInstance(scheme); + } catch (Throwable e) { + throw new IOException("Provider NOT found using redirect scheme: [" + scheme + "]", e); + } + + return factory; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java index 7af1de4..62c5001 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java @@ -51,6 +51,9 @@ public class AmqpSupport { public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id"); + // Symbols used to announce failover server list (in addition to redirect symbols below) + public static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list"); + // Symbols used to announce connection redirect ErrorCondition 'info' public static final Symbol PATH = Symbol.valueOf("path"); public static final Symbol SCHEME = Symbol.valueOf("scheme"); @@ -101,6 +104,8 @@ public class AmqpSupport { * Given an ErrorCondition instance create a new Exception that best matches * the error type. * + * @param provider + * the AMQP provider instance that originates this exception * @param endpoint * The target of the error. * @param errorCondition @@ -108,14 +113,16 @@ public class AmqpSupport { * * @return a new Exception instance that best matches the ErrorCondition value. */ - public static Exception convertToException(Endpoint endpoint, ErrorCondition errorCondition) { - return convertToException(endpoint, errorCondition, null); + public static Exception convertToException(AmqpProvider provider, Endpoint endpoint, ErrorCondition errorCondition) { + return convertToException(provider, endpoint, errorCondition, null); } /** * Given an ErrorCondition instance create a new Exception that best matches * the error type. * + * @param provider + * the AMQP provider instance that originates this exception * @param endpoint * The target of the error. * @param errorCondition @@ -125,7 +132,7 @@ public class AmqpSupport { * * @return a new Exception instance that best matches the ErrorCondition value. */ - public static Exception convertToException(Endpoint endpoint, ErrorCondition errorCondition, Exception defaultException) { + public static Exception convertToException(AmqpProvider provider, Endpoint endpoint, ErrorCondition errorCondition, Exception defaultException) { Exception remoteError = defaultException; if (errorCondition != null && errorCondition.getCondition() != null) { @@ -145,7 +152,7 @@ public class AmqpSupport { } else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) { remoteError = new TransactionRolledBackException(message); } else if (error.equals(ConnectionError.REDIRECT)) { - remoteError = createRedirectException(error, message, errorCondition); + remoteError = createRedirectException(provider, error, message, errorCondition); } else if (error.equals(AmqpError.INVALID_FIELD)) { Map<?, ?> info = errorCondition.getInfo(); if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) { @@ -192,6 +199,8 @@ public class AmqpSupport { * When a redirect type exception is received this method is called to create the * appropriate redirect exception type containing the error details needed. * + * @param provider + * the AMQP provider instance that originates this exception * @param error * the Symbol that defines the redirection error type. * @param message @@ -201,32 +210,20 @@ public class AmqpSupport { * * @return an Exception that captures the details of the redirection error. */ - public static Exception createRedirectException(Symbol error, String message, ErrorCondition condition) { + public static Exception createRedirectException(AmqpProvider provider, Symbol error, String message, ErrorCondition condition) { Exception result = null; Map<?, ?> info = condition.getInfo(); if (info == null) { result = new IOException(message + " : Redirection information not set."); } else { - String hostname = (String) info.get(OPEN_HOSTNAME); - String path = (String) info.get(PATH); - String scheme = (String) info.get(SCHEME); - - String networkHost = (String) info.get(NETWORK_HOST); - int port = 0; - - if (networkHost == null || networkHost.isEmpty()) { - result = new IOException(message + " : Redirection information not set."); - } else { - try { - port = Integer.parseInt(info.get(PORT).toString()); - } catch (Exception ex) { - result = new IOException(message + " : Redirection information not set."); - } - } + @SuppressWarnings("unchecked") + AmqpRedirect redirect = new AmqpRedirect((Map<Symbol, Object>) info, provider); - if (result == null) { - result = new ProviderRedirectedException(message, scheme, hostname, networkHost, port, path); + try { + result = new ProviderRedirectedException(message, redirect.validate().toURI()); + } catch (Exception ex) { + result = new IOException(message + " : " + ex.getMessage()); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java index f567b09..f6a57cb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java @@ -80,7 +80,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI } else if (state instanceof Rejected) { LOG.debug("Last TX request failed: {}", txId); Rejected rejected = (Rejected) state; - Exception cause = AmqpSupport.convertToException(getEndpoint(), rejected.getError()); + Exception cause = AmqpSupport.convertToException( + getParent().getProvider(), getEndpoint(), rejected.getError()); JMSException failureCause = null; if (txId.getProviderContext().equals(COMMIT_MARKER)) { failureCause = new TransactionRolledBackException(cause.getMessage()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java index 5792c09..98cabfc 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java @@ -18,7 +18,10 @@ package org.apache.qpid.jms.provider.amqp.builders; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY; +import java.net.URI; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import javax.jms.Session; @@ -28,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.AmqpProvider; +import org.apache.qpid.jms.provider.amqp.AmqpRedirect; import org.apache.qpid.jms.provider.amqp.AmqpSupport; import org.apache.qpid.jms.util.MetaDataSupport; import org.apache.qpid.proton.amqp.Symbol; @@ -129,6 +133,23 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A // be determined, this allows us to check for close pending. getResource().getProperties().initialize( getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties()); + + // If there are failover servers in the open then we signal that to the listeners + List<AmqpRedirect> failoverList = getResource().getProperties().getFailoverServerList(); + if (!failoverList.isEmpty()) { + List<URI> failoverURIs = new ArrayList<>(); + for (AmqpRedirect redirect : failoverList) { + try { + failoverURIs.add(redirect.toURI()); + } catch (Exception ex) { + LOG.trace("Error while creating URI from failover server: {}", redirect); + } + } + + if (!failoverURIs.isEmpty()) { + getResource().getProvider().fireRemotesDiscovered(failoverURIs); + } + } } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index e984e7b..69d5f04 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -169,7 +169,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex Throwable openError; if (hasRemoteError()) { - openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()); + openError = AmqpSupport.convertToException(parent.getProvider(), getEndpoint(), getEndpoint().getRemoteCondition()); } else if (cause != null) { openError = cause; } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index ec3b1fb..45d6087 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -122,6 +123,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private int startupMaxReconnectAttempts = DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS; private int warnAfterReconnectAttempts = DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS; + private FailoverServerListBehaviour amqpOpenServerListBehaviour = FailoverServerListBehaviour.REPLACE; + public FailoverProvider(Map<String, String> nestedOptions) { this(null, nestedOptions); } @@ -556,7 +559,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide if (cause instanceof ProviderRedirectedException) { ProviderRedirectedException redirect = (ProviderRedirectedException) cause; try { - uris.addFirst(buildRedirectURI(failedURI, redirect)); + uris.addFirst(redirect.getRedirectionURI()); } catch (Exception error) { LOG.warn("Could not construct redirection URI from remote provided information"); } @@ -785,17 +788,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } } - protected URI buildRedirectURI(URI sourceURI, ProviderRedirectedException redirect) throws Exception { - String scheme = sourceURI.getScheme(); - String host = redirect.getNetworkHost(); - String path = redirect.getPath(); - int port = redirect.getPort(); - - URI result = new URI(scheme, null, host, port, path, null, null); - - return result; - } - //--------------- DefaultProviderListener overrides ----------------------// @Override @@ -857,6 +849,57 @@ public class FailoverProvider extends DefaultProviderListener implements Provide }); } + @Override + public void onRemoteDiscovery(final List<URI> discovered) { + if (closingConnection.get() || closed.get() || failed.get()) { + return; + } + + if (discovered == null || discovered.isEmpty()) { + return; + } + + serializer.execute(new Runnable() { + @Override + public void run() { + if (!closingConnection.get() && !closed.get() && !failed.get()) { + + List<URI> newRemotes = new ArrayList<URI>(discovered); + switch (amqpOpenServerListBehaviour) { + case ADD: + try { + uris.addAll(discovered); + } catch (Throwable err) { + LOG.warn("Error while attempting to add discovered URIs: {}", discovered); + } + break; + case REPLACE: + // The current server is assumed not to be in the list of updated remote + // as it is meant for the failover nodes. The pool will de-dup if it is. + newRemotes.add(0, connectedURI); + try { + uris.replaceAll(newRemotes); + } catch (Throwable err) { + LOG.warn("Error while attempting to add discovered URIs: {}", discovered); + } + break; + case IGNORE: + // Do Nothing + break; + default: + // Shouldnt get here, but do nothing if we do. + break; + } + + // Inform any listener that we've made a new discovery. + if (listener != null) { + listener.onRemoteDiscovery(discovered); + } + } + } + }); + } + //--------------- URI update and rebalance methods -----------------------// public void add(final URI uri) { @@ -999,6 +1042,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide return this.requestTimeout; } + public String getAmqpOpenServerListBehaviour() { + return amqpOpenServerListBehaviour.toString(); + } + + public void setAmqpOpenServerListBehaviour(String amqpOpenServerListBehaviour) { + this.amqpOpenServerListBehaviour = FailoverServerListBehaviour.valueOf(amqpOpenServerListBehaviour.toUpperCase(Locale.ENGLISH)); + } + public Map<String, String> getNestedOptions() { return uris.getNestedOptions(); } @@ -1216,4 +1267,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide super.onSuccess(); } } + + private static enum FailoverServerListBehaviour { + ADD, REPLACE, IGNORE + }; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java index c01bf06..3740da6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -167,6 +168,25 @@ public class FailoverUriPool { /** * Adds a new URI to the pool if not already contained within. The URI will have + * any nest options that have been configured added to its existing set of options. + * + * @param uris + * The new list of URIs to add to the pool. + */ + public void addAll(List<URI> uris) { + if (uris == null || uris.isEmpty()) { + return; + } + + synchronized (uris) { + for (URI uri : uris) { + add(uri); + } + } + } + + /** + * Adds a new URI to the pool if not already contained within. The URI will have * any nested options that have been configured added to its existing set of options. * * The URI is added to the head of the pooled URIs and will be the next value that @@ -220,6 +240,41 @@ public class FailoverUriPool { } /** + * Removes all currently configured URIs from the pool, no new URIs will be + * served from this pool until new ones are added. + */ + public void removeAll() { + synchronized (uris) { + uris.clear(); + } + } + + /** + * Removes all currently configured URIs from the pool and replaces them with + * the new set given. + * + * @param replacements + * The new set of failover URIs to serve from this pool. + */ + public void replaceAll(List<URI> replacements) { + synchronized (uris) { + uris.clear(); + addAll(replacements); + } + } + + /** + * Gets the current list of URIs. The returned list is a copy. + * + * @return a copy of the current list of URIs in the pool. + */ + public List<URI> getList() { + synchronized (uris) { + return new ArrayList<>(uris); + } + } + + /** * Returns the currently set value for nested options which will be added to each * URI that is returned from the pool. * @@ -259,7 +314,7 @@ public class FailoverUriPool { if (firstAddr.equals(secondAddr)) { result = true; } - } catch(IOException e) { + } catch (IOException e) { if (firstAddr == null) { LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); } else { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java index fcb05a5..d3202e7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java @@ -100,6 +100,13 @@ public abstract class TransportFactory { public abstract String getName(); /** + * @return true if the Transport that this factory provides uses a secure channel. + */ + public boolean isSecure() { + return false; + } + + /** * Static create method that performs the TransportFactory search and handles the * configuration and setup. * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java index 59d7061..628983a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java @@ -33,4 +33,9 @@ public class NettySslTransportFactory extends NettyTcpTransportFactory { public String getName() { return "SSL"; } + + @Override + public boolean isSecure() { + return true; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java index 71e396c..ef594c6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java @@ -33,4 +33,9 @@ public class NettyWssTransportFactory extends NettyWsTransportFactory { public String getName() { return "WSS"; } + + @Override + public boolean isSecure() { + return true; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java index 83d370a..900259b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java @@ -27,7 +27,6 @@ import java.net.URL; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -201,7 +200,7 @@ public class PropertyUtil { */ public static Map<String, String> parseQuery(String queryString) throws Exception { if (queryString != null && !queryString.isEmpty()) { - Map<String, String> rc = new HashMap<String, String>(); + Map<String, String> rc = new LinkedHashMap<String, String>(); String[] parameters = queryString.split("&"); for (int i = 0; i < parameters.length; i++) { int p = parameters[i].indexOf("="); @@ -236,7 +235,7 @@ public class PropertyUtil { throw new IllegalArgumentException("The given properties object was null."); } - HashMap<String, String> rc = new HashMap<String, String>(properties.size()); + Map<String, String> rc = new LinkedHashMap<String, String>(properties.size()); for (Iterator<Entry<String, String>> iter = properties.entrySet().iterator(); iter.hasNext();) { Entry<String, String> entry = iter.next(); @@ -269,7 +268,7 @@ public class PropertyUtil { throw new IllegalArgumentException("Given Properties object cannot be null"); } - Map<String, String> unmatched = new HashMap<String, String>(); + Map<String, String> unmatched = new LinkedHashMap<String, String>(); for (Map.Entry<String, String> entry : properties.entrySet()) { if (!setProperty(target, entry.getKey(), entry.getValue())) { @@ -299,7 +298,7 @@ public class PropertyUtil { throw new IllegalArgumentException("Given Properties object cannot be null"); } - Map<String, Object> unmatched = new HashMap<String, Object>(); + Map<String, Object> unmatched = new LinkedHashMap<String, Object>(); for (Map.Entry<Object, Object> entry : properties.entrySet()) { if (!setProperty(target, (String) entry.getKey(), entry.getValue())) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java index 2dc54f4..6fb9b97 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java @@ -21,6 +21,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -405,8 +406,11 @@ public class URISupport { /** * Given a Key / Value mapping create and append a URI query value that represents the * mapped entries, return the newly updated URI that contains the value of the given URI and - * the appended query value. Each entry in the query string is prefixed by the supplied - * optionPrefix string. + * the appended query value. Only values in the given options map that start with the provided + * prefix are appended to the provided URI, the prefix is stripped off before the insertion. + * <P> + * This method replaces the value of any matching query string options in the original URI with + * the value given in the provided query parameters map. * * @param uri * The source URI that will have the Map entries appended as a URI query value. @@ -422,18 +426,20 @@ public class URISupport { */ public static URI applyParameters(URI uri, Map<String, String> queryParameters, String optionPrefix) throws URISyntaxException { if (queryParameters != null && !queryParameters.isEmpty()) { - StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer(); + + Map<String, String> currentParameters = new LinkedHashMap<String, String>(parseParameters(uri)); + + // Replace any old values with the new value from the provided map. for (Map.Entry<String, String> param : queryParameters.entrySet()) { if (param.getKey().startsWith(optionPrefix)) { - if (newQuery.length() != 0) { - newQuery.append('&'); - } final String key = param.getKey().substring(optionPrefix.length()); - newQuery.append(key).append('=').append(param.getValue()); + currentParameters.put(key, param.getValue()); } } - uri = PropertyUtil.replaceQuery(uri, newQuery.toString()); + + uri = PropertyUtil.replaceQuery(uri, currentParameters); } + return uri; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp index 56741dc..e6558d0 100644 --- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp @@ -15,4 +15,5 @@ ## limitations under the License. ## --------------------------------------------------------------------------- class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory -transportType=tcp \ No newline at end of file +transportScheme=tcp +providerScheme=amqp \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps index 414957b..1a5b61f 100644 --- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps @@ -15,4 +15,5 @@ ## limitations under the License. ## --------------------------------------------------------------------------- class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory -transportType=ssl \ No newline at end of file +transportScheme=ssl +providerScheme=amqps \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws index 9cd4d1c..87fa753 100644 --- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws @@ -15,4 +15,5 @@ ## limitations under the License. ## --------------------------------------------------------------------------- class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory -transportType=ws \ No newline at end of file +transportScheme=ws +providerScheme=amqpws \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss index 4c2b8c9..705d2e2 100644 --- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss @@ -15,4 +15,5 @@ ## limitations under the License. ## --------------------------------------------------------------------------- class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory -transportType=wss \ No newline at end of file +transportScheme=wss +providerScheme=amqpwss \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws new file mode 100644 index 0000000..87fa753 --- /dev/null +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory +transportScheme=ws +providerScheme=amqpws \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss new file mode 100644 index 0000000..705d2e2 --- /dev/null +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory +transportScheme=wss +providerScheme=amqpwss \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java index c1b8b9e..b152894 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms; import java.net.URI; +import java.util.List; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -59,4 +60,8 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener { @Override public void onProducerClosed(MessageProducer producer, Throwable cause) { } + + @Override + public void onRemoteDiscovery(List<URI> remotes) { + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index f66ca6d..e5b752c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -457,9 +457,12 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { assertTrue(asyncError.get().getCause() instanceof ProviderRedirectedException); ProviderRedirectedException redirect = (ProviderRedirectedException) asyncError.get().getCause(); - assertEquals(redirectVhost, redirect.getHostname()); - assertEquals(redirectNetworkHost, redirect.getNetworkHost()); - assertEquals(redirectPort, redirect.getPort()); + URI redirectionURI = redirect.getRedirectionURI(); + + assertNotNull(redirectionURI); + assertTrue(redirectVhost, redirectionURI.getQuery().contains("amqp.vhost=" + redirectVhost)); + assertEquals(redirectNetworkHost, redirectionURI.getHost()); + assertEquals(redirectPort, redirectionURI.getPort()); testPeer.waitForAllHandlersToComplete(1000); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java index 1292352..c501cfa 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java @@ -20,10 +20,12 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -171,9 +173,13 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase { } catch (JMSException jmsex) { assertTrue(jmsex.getCause() instanceof ProviderRedirectedException); ProviderRedirectedException redirectEx = (ProviderRedirectedException) jmsex.getCause(); - assertEquals("vhost", redirectEx.getHostname()); - assertEquals("127.0.0.1", redirectEx.getNetworkHost()); - assertEquals(5672, redirectEx.getPort()); + + URI redirectionURI = redirectEx.getRedirectionURI(); + + assertNotNull(redirectionURI); + assertTrue("vhost", redirectionURI.getQuery().contains("amqp.vhost=vhost")); + assertEquals("127.0.0.1", redirectionURI.getHost()); + assertEquals(5672, redirectionURI.getPort()); } catch (Exception ex) { fail("Should have thrown JMSException: " + ex); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
