Author: orudyy
Date: Thu Apr  7 14:38:41 2016
New Revision: 1738135

URL: http://svn.apache.org/viewvc?rev=1738135&view=rev
Log:
QPID-7189: Fix creation of delegate for AMQP 0-9-1 in response to broker 
supported protocol on protocol negotiation

Modified:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
Thu Apr  7 14:38:41 2016
@@ -57,6 +57,7 @@ import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
 
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.jndi.ObjectFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -626,8 +627,15 @@ public class AMQConnection extends Close
             Class partypes[] = new Class[1];
             partypes[0] = AMQConnection.class;
             _delegate = (AMQConnectionDelegate) 
c.getConstructor(partypes).newInstance(this);
-            //Update our session to use this new protocol version
-            
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
+
+            if (!ProtocolVersion.v0_10.equals(_delegate.getProtocolVersion()))
+            {
+                
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
+            }
+
+            // reset state waiter state
+           _protocolHandler.getStateManager().clearLastException();
+            
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
 
         }
         catch (ClassNotFoundException e)

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
 Thu Apr  7 14:38:41 2016
@@ -479,8 +479,13 @@ public class AMQProtocolHandler implemen
                     // suggesting an alternate ProtocolVersion; the server 
will then close the
                     // connection.
                     ProtocolInitiation protocolInit = (ProtocolInitiation) 
message;
-                    _suggestedProtocolVersion = protocolInit.checkVersion();
-                    _logger.info("Broker suggested using protocol version: {} 
", _suggestedProtocolVersion);
+                    ProtocolVersion checkedVersion = 
protocolInit.checkVersion();
+                    _logger.info("Broker suggested using protocol version: {} 
", checkedVersion);
+
+                    // Create protocol version from reported major and minor 
versions
+                    // in order to use them on delegate instantiation.
+                    // Currently delegate classes are named based on reported 
major and minor versions.
+                    _suggestedProtocolVersion = 
ProtocolVersion.get(protocolInit.getProtocolMajor(), 
protocolInit.getProtocolMinor());
 
                     // get round a bug in old versions of qpid whereby the 
connection is not closed
                     _stateManager.changeState(AMQState.CONNECTION_CLOSED);

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1738135&r1=1738134&r2=1738135&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 Thu Apr  7 14:38:41 2016
@@ -25,25 +25,53 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.HeartbeatBody;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
 import org.apache.qpid.transport.network.Frame;
 
 public class ProtocolNegotiationTest extends QpidBrokerTestCase
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolNegotiationTest.class);
     private static final int SO_TIMEOUT = 5000;
     public static final int AMQP_HEADER_LEN = 8;
     private ProtocolVersion _expectedProtocolInit;
 
     public void setUp() throws Exception
     {
+        // restrict broker to support only single protocol
+        TestBrokerConfiguration config = getDefaultBrokerConfiguration();
+        config.setObjectAttribute(Port.class,
+                                  TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+                                  Port.PROTOCOLS,
+                                  Arrays.asList(getBrokerProtocol()));
+        config.setObjectAttribute(Port.class,
+                                  TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
+                                  Port.CONTEXT,
+                                  
Collections.singletonMap(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY,
 null));
+        config.setBrokerAttribute(Broker.CONTEXT,
+                                  
Collections.singletonMap(BrokerProperties.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY,
 null));
+
         super.setUp();
         _expectedProtocolInit = 
convertProtocolToProtocolVersion(getBrokerProtocol());
     }
@@ -188,6 +216,33 @@ public class ProtocolNegotiationTest ext
         }
     }
 
+    public void testProtocolNegotiationFromUnsupportedVersion() throws 
Exception
+    {
+        Protocol testProtocol = getBrokerProtocol();
+        String testSupportedProtocols = 
System.getProperty("test.amqp_port_protocols");
+        if (testSupportedProtocols!= null)
+        {
+            Set<Protocol> availableProtocols = new HashSet<>();
+            List<Object> protocols = new 
ObjectMapper().readValue(testSupportedProtocols, List.class);
+            for (Object protocol : protocols)
+            {
+                
availableProtocols.add(Protocol.valueOf(String.valueOf(protocol)));
+            }
+            availableProtocols.remove(testProtocol);
+
+            for (Protocol protocol: availableProtocols)
+            {
+                String version = protocol.name().substring(5).replace('_', 
'-');
+                LOGGER.debug("Negotiation version {} represented as {}", 
protocol.name(), version);
+                setTestSystemProperty(ClientProperties.AMQP_VERSION, version);
+                AMQConnection connection = (AMQConnection)getConnection();
+                LOGGER.debug("Negotiated version {}", 
connection.getProtocolVersion());
+                assertEquals("Unexpected version negotiated: " + 
connection.getProtocolVersion(), _expectedProtocolInit, 
connection.getProtocolVersion());
+                connection.close();
+            }
+        }
+    }
+
     private boolean writeHeartbeat(final TestSender sender)
             throws IOException
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to