Author: rgodfrey
Date: Fri Dec 18 16:23:19 2009
New Revision: 892301
URL: http://svn.apache.org/viewvc?rev=892301&view=rev
Log:
QPID-2273 : Fix Protocol Negotiation
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/trunk/qpid/java/common/templates/model/ProtocolVersionListClass.vm
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
Fri Dec 18 16:23:19 2009
@@ -140,6 +140,12 @@
.withDescription("when listening on the specified port
do not accept AMQP0-10 connections. The specified port must be one specified on
the command line")
.withLongOpt("exclude-0-10").create();
+ Option exclude0_9_1 =
+ OptionBuilder.withArgName("exclude-0-9-1").hasArg()
+ .withDescription("when listening on the specified port
do not accept AMQP0-9-1 connections. The specified port must be one specified
on the command line")
+ .withLongOpt("exclude-0-9-1").create();
+
+
Option exclude0_9 =
OptionBuilder.withArgName("exclude-0-9").hasArg()
.withDescription("when listening on the specified port
do not accept AMQP0-9 connections. The specified port must be one specified on
the command line")
@@ -179,6 +185,7 @@
options.addOption(logwatchconfig);
options.addOption(port);
options.addOption(exclude0_10);
+ options.addOption(exclude0_9_1);
options.addOption(exclude0_9);
options.addOption(exclude0_8);
options.addOption(mport);
@@ -335,6 +342,7 @@
Set<Integer> ports = new HashSet<Integer>();
Set<Integer> exclude_0_10 = new HashSet<Integer>();
+ Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
Set<Integer> exclude_0_9 = new HashSet<Integer>();
Set<Integer> exclude_0_8 = new HashSet<Integer>();
@@ -343,6 +351,7 @@
parsePortList(ports, serverConfig.getPorts());
parsePortList(exclude_0_10, serverConfig.getPortExclude010());
+ parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
parsePortList(exclude_0_9, serverConfig.getPortExclude09());
parsePortList(exclude_0_8, serverConfig.getPortExclude08());
@@ -351,6 +360,7 @@
{
parsePortArray(ports, portStr);
parsePortArray(exclude_0_10,
commandLine.getOptionValues("exclude-0-10"));
+ parsePortArray(exclude_0_9_1,
commandLine.getOptionValues("exclude-0-9-1"));
parsePortArray(exclude_0_9,
commandLine.getOptionValues("exclude-0-9"));
parsePortArray(exclude_0_8,
commandLine.getOptionValues("exclude-0-8"));
@@ -399,6 +409,11 @@
{
supported.remove(VERSION.v0_10);
}
+
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(VERSION.v0_9_1);
+ }
if(exclude_0_9.contains(port))
{
supported.remove(VERSION.v0_9);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
Fri Dec 18 16:23:19 2009
@@ -538,6 +538,11 @@
return getConfig().getList("connector.non010port",
Collections.EMPTY_LIST);
}
+ public List getPortExclude091()
+ {
+ return getConfig().getList("connector.non091port",
Collections.EMPTY_LIST);
+ }
+
public List getPortExclude09()
{
return getConfig().getList("connector.non09port",
Collections.EMPTY_LIST);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
Fri Dec 18 16:23:19 2009
@@ -45,7 +45,6 @@
register(ProtocolVersion.v8_0,
org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9,
org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_91,
org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
-
}
private static void register(ProtocolVersion version, Factory converter)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
Fri Dec 18 16:23:19 2009
@@ -135,7 +135,7 @@
(byte) 'M',
(byte) 'Q',
(byte) 'P',
- (byte) 1,
+ (byte) 0,
(byte) 0,
(byte) 9,
(byte) 1
@@ -250,6 +250,59 @@
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1,
creator_0_10 };
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ {
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ _logger.error("Error processing incoming data, could not negotiate
a common protocol");
+ }
+
+ public void exception(Throwable t)
+ {
+ _logger.error("Error establishing session", t);
+ }
+
+ public void closed()
+ {
+
+ }
+
+ public void writerIdle()
+ {
+
+ }
+
+ public void readerIdle()
+ {
+
+ }
+ }
+
private class SelfDelegateProtocolEngine implements ProtocolEngine
{
@@ -303,12 +356,14 @@
ProtocolEngine newDelegate = null;
+ byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length;
i++)
{
if(_supported.contains(_creators[i].getVersion()))
{
+ newestSupported = _creators[i].getHeaderIdentifier();
byte[] compareBytes =
_creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -319,24 +374,28 @@
{
newDelegate = _creators[i].getProtocolEngine();
}
-
-
}
}
- // let the first delegate handle completely unknown versions
+
+ // If no delegate is found then send back the most recent
support protocol version id
if(newDelegate == null)
{
- newDelegate = _creators[0].getProtocolEngine();
+ _networkDriver.send(ByteBuffer.wrap(newestSupported));
+
+ newDelegate = new ClosedDelegateProtocolEngine();
}
- newDelegate.setNetworkDriver(_networkDriver);
+ else
+ {
+ newDelegate.setNetworkDriver(_networkDriver);
- _delegate = newDelegate;
+ _delegate = newDelegate;
- _header.flip();
- _delegate.received(_header);
- if(msg.hasRemaining())
- {
- _delegate.received(msg);
+ _header.flip();
+ _delegate.received(_header);
+ if(msg.hasRemaining())
+ {
+ _delegate.received(msg);
+ }
}
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Fri Dec 18 16:23:19 2009
@@ -308,7 +308,6 @@
/** Thread Pool for executing connection level processes. Such as
returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME
TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
@@ -458,9 +457,17 @@
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails =
_failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM) ||
"0-8".equals(amqpVersion) || "0-9".equals(amqpVersion))
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) ||
"0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
+ }
+ else if ("0-9".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_0_9(this);
+ }
+ else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
+ {
+ _delegate = new AMQConnectionDelegate_9_1(this);
}
else
{
@@ -1541,13 +1548,7 @@
public ProtocolVersion getProtocolVersion()
{
- return _protocolVersion;
- }
-
- public void setProtocolVersion(ProtocolVersion protocolVersion)
- {
- _protocolVersion = protocolVersion;
-
_protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ return _delegate.getProtocolVersion();
}
public boolean isFailingOver()
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Fri Dec 18 16:23:19 2009
@@ -61,4 +61,6 @@
void setIdleTimeout(long l);
int getMaxChannelID();
+
+ ProtocolVersion getProtocolVersion();
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Fri Dec 18 16:23:19 2009
@@ -301,4 +301,9 @@
{
return Integer.MAX_VALUE;
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_10;
+ }
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
Fri Dec 18 16:23:19 2009
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.ProtocolVersion;
+
public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0
{
@@ -28,5 +30,11 @@
{
super(conn);
}
+
+ @Override
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_9;
+ }
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Fri Dec 18 16:23:19 2009
@@ -107,9 +107,13 @@
{
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
+ return null;
+ }
+ else
+ {
+ return _conn._protocolHandler.getSuggestedProtocolVersion();
}
- return null;
}
public org.apache.qpid.jms.Session createSession(final boolean transacted,
final int acknowledgeMode, final int prefetch)
@@ -306,4 +310,9 @@
{
return (int) (Math.pow(2, 16)-1);
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v8_0;
+ }
}
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
Fri Dec 18 16:23:19 2009
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.framing.ProtocolVersion;
+
public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0
{
@@ -29,4 +31,9 @@
super(conn);
}
+ @Override
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ProtocolVersion.v0_91;
+ }
}
\ No newline at end of file
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Fri Dec 18 16:23:19 2009
@@ -171,6 +171,7 @@
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference =
ReferenceCountingExecutorService.getInstance();
private NetworkDriver _networkDriver;
+ private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
@@ -427,6 +428,7 @@
Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new
Runnable()
{
+
public void run()
{
// Decode buffer
@@ -467,9 +469,8 @@
// suggesting an alternate ProtocolVersion;
the server will then close the
// connection.
ProtocolInitiation protocolInit =
(ProtocolInitiation) message;
- ProtocolVersion pv =
protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
+ _suggestedProtocolVersion =
protocolInit.checkVersion();
+
// get round a bug in old versions of qpid
whereby the connection is not closed
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
@@ -845,4 +846,10 @@
{
return _networkDriver;
}
+
+ public ProtocolVersion getSuggestedProtocolVersion()
+ {
+ return _suggestedProtocolVersion;
+ }
+
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
Fri Dec 18 16:23:19 2009
@@ -39,13 +39,15 @@
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
private static final byte CLASS = 1;
+ final private byte protoClass;
final private byte instance;
final private byte major;
final private byte minor;
private int channel;
- public ProtocolHeader(byte instance, byte major, byte minor)
+ public ProtocolHeader(byte protoClass, byte instance, byte major, byte
minor)
{
+ this.protoClass = protoClass;
this.instance = instance;
this.major = major;
this.minor = minor;
@@ -53,7 +55,7 @@
public ProtocolHeader(int instance, int major, int minor)
{
- this((byte) instance, (byte) major, (byte) minor);
+ this(CLASS, (byte) instance, (byte) major, (byte) minor);
}
public byte getInstance()
@@ -90,7 +92,7 @@
{
ByteBuffer buf = ByteBuffer.allocate(8);
buf.put(AMQP);
- buf.put(CLASS);
+ buf.put(protoClass);
buf.put(instance);
buf.put(major);
buf.put(minor);
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
Fri Dec 18 16:23:19 2009
@@ -144,10 +144,11 @@
return ERROR;
}
+ byte protoClass = input.get(pos + 4);
byte instance = input.get(pos + 5);
byte major = input.get(pos + 6);
byte minor = input.get(pos + 7);
- receiver.received(new ProtocolHeader(instance, major, minor));
+ receiver.received(new ProtocolHeader(protoClass, instance, major,
minor));
needed = Frame.HEADER_SIZE;
return FRAME_HDR;
case FRAME_HDR:
Modified:
qpid/trunk/qpid/java/common/templates/model/ProtocolVersionListClass.vm
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/templates/model/ProtocolVersionListClass.vm?rev=892301&r1=892300&r2=892301&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/templates/model/ProtocolVersionListClass.vm
(original)
+++ qpid/trunk/qpid/java/common/templates/model/ProtocolVersionListClass.vm Fri
Dec 18 16:23:19 2009
@@ -149,15 +149,20 @@
private static final ProtocolVersion _defaultVersion;
+ public static final ProtocolVersion v0_10 = new
ProtocolVersion((byte)0,(byte)10);
+
#foreach( $version in $model.getVersionSet() )
#set( $versionId = "v$version.getMajor()_$version.getMinor()" )
- public static final ProtocolVersion $versionId = new
ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor());
+ public static final ProtocolVersion $versionId = new
ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor());
#end
+
static
{
SortedSet<ProtocolVersion> versions = new TreeSet<ProtocolVersion>();
+ versions.add(v0_10);
+ _nameToVersionMap.put("0-10", v0_10);
#foreach( $version in $model.getVersionSet() )
#set( $versionId = "v$version.getMajor()_$version.getMinor()" )
versions.add($versionId);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]