Author: orudyy
Date: Thu Apr 7 14:38:41 2016
New Revision: 1738135
URL: http://svn.apache.org/viewvc?rev=1738135&view=rev
Log:
QPID-7189: Fix creation of delegate for AMQP 0-9-1 in response to broker
supported protocol on protocol negotiation
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Thu Apr 7 14:38:41 2016
@@ -57,6 +57,7 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.jndi.ObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -626,8 +627,15 @@ public class AMQConnection extends Close
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate)
c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
-
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
+
+ if (!ProtocolVersion.v0_10.equals(_delegate.getProtocolVersion()))
+ {
+
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
+ }
+
+ // reset state waiter state
+ _protocolHandler.getStateManager().clearLastException();
+
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
}
catch (ClassNotFoundException e)
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
Thu Apr 7 14:38:41 2016
@@ -479,8 +479,13 @@ public class AMQProtocolHandler implemen
// suggesting an alternate ProtocolVersion; the server
will then close the
// connection.
ProtocolInitiation protocolInit = (ProtocolInitiation)
message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version: {}
", _suggestedProtocolVersion);
+ ProtocolVersion checkedVersion =
protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version: {}
", checkedVersion);
+
+ // Create protocol version from reported major and minor
versions
+ // in order to use them on delegate instantiation.
+ // Currently delegate classes are named based on reported
major and minor versions.
+ _suggestedProtocolVersion =
ProtocolVersion.get(protocolInit.getProtocolMajor(),
protocolInit.getProtocolMinor());
// get round a bug in old versions of qpid whereby the
connection is not closed
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Thu Apr 7 14:38:41 2016
@@ -25,25 +25,53 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.transport.network.Frame;
public class ProtocolNegotiationTest extends QpidBrokerTestCase
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolNegotiationTest.class);
private static final int SO_TIMEOUT = 5000;
public static final int AMQP_HEADER_LEN = 8;
private ProtocolVersion _expectedProtocolInit;
public void setUp() throws Exception
{
+ // restrict broker to support only single protocol
+ TestBrokerConfiguration config = getDefaultBrokerConfiguration();
+ config.setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+ Port.PROTOCOLS,
+ Arrays.asList(getBrokerProtocol()));
+ config.setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+ Port.CONTEXT,
+
Collections.singletonMap(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY,
null));
+ config.setBrokerAttribute(Broker.CONTEXT,
+
Collections.singletonMap(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY,
null));
+
super.setUp();
_expectedProtocolInit =
convertProtocolToProtocolVersion(getBrokerProtocol());
}
@@ -188,6 +216,33 @@ public class ProtocolNegotiationTest ext
}
}
+ public void testProtocolNegotiationFromUnsupportedVersion() throws
Exception
+ {
+ Protocol testProtocol = getBrokerProtocol();
+ String testSupportedProtocols =
System.getProperty("test.amqp_port_protocols");
+ if (testSupportedProtocols!= null)
+ {
+ Set<Protocol> availableProtocols = new HashSet<>();
+ List<Object> protocols = new
ObjectMapper().readValue(testSupportedProtocols, List.class);
+ for (Object protocol : protocols)
+ {
+
availableProtocols.add(Protocol.valueOf(String.valueOf(protocol)));
+ }
+ availableProtocols.remove(testProtocol);
+
+ for (Protocol protocol: availableProtocols)
+ {
+ String version = protocol.name().substring(5).replace('_',
'-');
+ LOGGER.debug("Negotiation version {} represented as {}",
protocol.name(), version);
+ setTestSystemProperty(ClientProperties.AMQP_VERSION, version);
+ AMQConnection connection = (AMQConnection)getConnection();
+ LOGGER.debug("Negotiated version {}",
connection.getProtocolVersion());
+ assertEquals("Unexpected version negotiated: " +
connection.getProtocolVersion(), _expectedProtocolInit,
connection.getProtocolVersion());
+ connection.close();
+ }
+ }
+ }
+
private boolean writeHeartbeat(final TestSender sender)
throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]