Author: rgodfrey
Date: Mon Jan 30 16:44:48 2012
New Revision: 1237772
URL: http://svn.apache.org/viewvc?rev=1237772&view=rev
Log:
QPID-3789 : [Java] Remove duplication of output converters and optimise startup
time
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
- copied, changed from r1237572,
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
Removed:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1237772&r1=1237771&r2=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
Mon Jan 30 16:44:48 2012
@@ -186,7 +186,6 @@ public class Broker
{
bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
}
- String hostName = bindAddress.getCanonicalHostName();
if (!serverConfig.getSSLOnly())
{
@@ -199,7 +198,7 @@ public class Broker
final IncomingNetworkTransport transport =
Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory
protocolEngineFactory =
- new
MultiVersionProtocolEngineFactory(hostName, supported);
+ new
MultiVersionProtocolEngineFactory(supported);
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(new
InetSocketAddress(bindAddress, port),
@@ -224,7 +223,7 @@ public class Broker
final IncomingNetworkTransport transport =
Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory
protocolEngineFactory =
- new
MultiVersionProtocolEngineFactory(hostName, supported);
+ new
MultiVersionProtocolEngineFactory(supported);
transport.accept(settings, protocolEngineFactory,
sslContext);
ApplicationRegistry.getInstance().addAcceptor(new
InetSocketAddress(bindAddress, sslPort),
Copied:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
(from r1237572,
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java&r1=1237572&r2=1237772&rev=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
Mon Jan 30 16:44:48 2012
@@ -1,4 +1,4 @@
-package org.apache.qpid.server.output.amqp0_9_1;
+package org.apache.qpid.server.output;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -32,15 +32,11 @@ import org.apache.qpid.framing.BasicGetO
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.transport.DeliveryProperties;
@@ -49,27 +45,17 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
- private static final MethodRegistry METHOD_REGISTRY =
MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession
session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
+ private static final int BASIC_CLASS_ID = 60;
+ private final MethodRegistry _methodRegistry;
private final AMQProtocolSession _protocolSession;
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry
methodRegistry)
{
_protocolSession = session;
+ _methodRegistry = methodRegistry;
}
@@ -97,7 +83,7 @@ public class ProtocolOutputConverterImpl
{
final MessageTransferMessage message = (MessageTransferMessage)
entry.getMessage();
BasicContentHeaderProperties props =
HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
- ContentHeaderBody chb = new ContentHeaderBody(props,
BasicGetBodyImpl.CLASS_ID);
+ ContentHeaderBody chb = new ContentHeaderBody(props,
BASIC_CLASS_ID);
chb.setBodySize(message.getSize());
return chb;
}
@@ -199,15 +185,6 @@ public class ProtocolOutputConverterImpl
}
}
- private AMQDataBlock createContentHeaderBlock(final int channelId, final
ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
-
contentHeaderBody);
- return contentHeader;
- }
-
-
public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag,
int queueSize) throws AMQException
{
AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag,
queueSize);
@@ -248,7 +225,7 @@ public class ProtocolOutputConverterImpl
public AMQBody createAMQBody()
{
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ return _methodRegistry.createBasicDeliverBody(consumerTag,
deliveryTag,
isRedelivered,
exchangeName,
@@ -316,7 +293,7 @@ public class ProtocolOutputConverterImpl
final boolean isRedelivered = entry.isRedelivered();
BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ _methodRegistry.createBasicGetOkBody(deliveryTag,
isRedelivered,
exchangeName,
routingKey,
@@ -327,7 +304,7 @@ public class ProtocolOutputConverterImpl
public byte getProtocolMinorVersion()
{
- return getProtocolSession().getProtocolMinorVersion();
+ return _protocolSession.getProtocolMinorVersion();
}
public byte getProtocolMajorVersion()
@@ -341,7 +318,7 @@ public class ProtocolOutputConverterImpl
{
BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ _methodRegistry.createBasicReturnBody(replyCode,
replyText,
messagePublishInfo.getExchange(),
messagePublishInfo.getRoutingKey());
@@ -369,7 +346,7 @@ public class ProtocolOutputConverterImpl
public void confirmConsumerAutoClose(int channelId, AMQShortString
consumerTag)
{
- BasicCancelOkBody basicCancelOkBody =
METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ BasicCancelOkBody basicCancelOkBody =
_methodRegistry.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java?rev=1237772&r1=1237771&r2=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
Mon Jan 30 16:44:48 2012
@@ -26,6 +26,7 @@
*/
package org.apache.qpid.server.output;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -42,19 +43,19 @@ public class ProtocolOutputConverterRegi
static
{
- register(ProtocolVersion.v8_0,
org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
- register(ProtocolVersion.v0_9,
org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
- register(ProtocolVersion.v0_91,
org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
+ register(ProtocolVersion.v8_0);
+ register(ProtocolVersion.v0_9);
+ register(ProtocolVersion.v0_91);
}
private ProtocolOutputConverterRegistry()
{
}
- private static void register(ProtocolVersion version, Factory converter)
+ private static void register(ProtocolVersion version)
{
- _registry.put(version,converter);
+ _registry.put(version,new ConverterFactory(version));
}
@@ -62,4 +63,28 @@ public class ProtocolOutputConverterRegi
{
return
_registry.get(session.getProtocolVersion()).newInstance(session);
}
+
+ private static class ConverterFactory implements Factory
+ {
+ private ProtocolVersion _protocolVersion;
+ private MethodRegistry _methodRegistry;
+ private int _classId;
+
+ public ConverterFactory(ProtocolVersion pv)
+ {
+ _protocolVersion = pv;
+
+ }
+
+ public synchronized ProtocolOutputConverter
newInstance(AMQProtocolSession session)
+ {
+ if(_methodRegistry == null)
+ {
+
+ _methodRegistry =
MethodRegistry.getMethodRegistry(_protocolVersion);
+
+ }
+ return new ProtocolOutputConverterImpl(session, _methodRegistry);
+ }
+ }
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1237772&r1=1237771&r2=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
Mon Jan 30 16:44:48 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
@@ -49,23 +50,20 @@ public class MultiVersionProtocolEngine
private volatile ServerProtocolEngine _delegate = new
SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
Set<AmqpProtocolVersion> supported,
NetworkConnection network,
long id)
{
- this(appRegistry,fqdn,supported,id);
+ this(appRegistry, supported,id);
setNetworkConnection(network);
}
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
Set<AmqpProtocolVersion> supported,
long id)
{
_id = id;
_appRegistry = appRegistry;
- _fqdn = fqdn;
_supported = supported;
}
@@ -177,6 +175,15 @@ public class MultiVersionProtocolEngine
public void setNetworkConnection(NetworkConnection network,
Sender<ByteBuffer> sender)
{
_network = network;
+ SocketAddress address = _network.getLocalAddress();
+ if (address instanceof InetSocketAddress)
+ {
+ _fqdn = ((InetSocketAddress) address).getHostName();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported socket address
class: " + address);
+ }
_sender = sender;
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1237772&r1=1237771&r2=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
Mon Jan 30 16:44:48 2012
@@ -34,24 +34,22 @@ public class MultiVersionProtocolEngineF
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
- private final String _fqdn;
private final Set<AmqpProtocolVersion> _supported;
- public MultiVersionProtocolEngineFactory(String fqdn,
Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(Set<AmqpProtocolVersion>
supportedVersions)
{
_appRegistry = ApplicationRegistry.getInstance();
- _fqdn = fqdn;
_supported = supportedVersions;
}
public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported,
network, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _supported,
network, ID_GENERATOR.getAndIncrement());
}
public ServerProtocolEngine newProtocolEngine()
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported,
ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _supported,
ID_GENERATOR.getAndIncrement());
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1237772&r1=1237771&r2=1237772&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
Mon Jan 30 16:44:48 2012
@@ -121,7 +121,7 @@ public class MultiVersionProtocolEngineF
Set<AmqpProtocolVersion> versions =
EnumSet.allOf(AmqpProtocolVersion.class);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory("localhost", versions);
+ new MultiVersionProtocolEngineFactory(versions);
//create a dummy to retrieve the 'current' ID number
long previousId = factory.newProtocolEngine(new
TestNetworkConnection()).getConnectionId();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]