Repository: activemq Updated Branches: refs/heads/master 8bb58036a -> b9ed01fa5
https://issues.apache.org/jira/browse/AMQ-5698 Ensure that wireFormat transport options get applied Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9ed01fa Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9ed01fa Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9ed01fa Branch: refs/heads/master Commit: b9ed01fa56d7bb5be80bdc2a1c48d37ebdae46e1 Parents: 8bb5803 Author: Timothy Bish <[email protected]> Authored: Tue Mar 31 12:35:50 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Mar 31 12:36:00 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpNioTransportFactory.java | 11 ++++++++--- .../transport/amqp/AmqpSslTransportFactory.java | 11 ++++++++--- .../activemq/transport/amqp/AmqpTransportFactory.java | 11 ++++++++--- .../org/apache/activemq/transport/amqp/IDERunner.java | 13 +++++++++++-- 4 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b9ed01fa/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java index bbcac47..d54d58f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java @@ -80,9 +80,14 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerService); - IntrospectionSupport.setProperties(transport, options); - return super.compositeConfigure(transport, format, options); + AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService); + + Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat."); + + IntrospectionSupport.setProperties(amqpTransport, options); + IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions); + + return super.compositeConfigure(amqpTransport, format, options); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/b9ed01fa/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java index 5d04e8c..00c72ca 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpSslTransportFactory.java @@ -42,9 +42,14 @@ public class AmqpSslTransportFactory extends SslTransportFactory implements Brok @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerService); - IntrospectionSupport.setProperties(transport, options); - return super.compositeConfigure(transport, format, options); + AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService); + + Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat."); + + IntrospectionSupport.setProperties(amqpTransport, options); + IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions); + + return super.compositeConfigure(amqpTransport, format, options); } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/activemq/blob/b9ed01fa/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java index 6dce2c0..7f93b21 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java @@ -42,9 +42,14 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS @Override @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - transport = new AmqpTransportFilter(transport, format, brokerService); - IntrospectionSupport.setProperties(transport, options); - return super.compositeConfigure(transport, format, options); + AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService); + + Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat."); + + IntrospectionSupport.setProperties(amqpTransport, options); + IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions); + + return super.compositeConfigure(amqpTransport, format, options); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/b9ed01fa/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java index 11b5c71..ab64f21 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java @@ -18,7 +18,10 @@ package org.apache.activemq.transport.amqp; import java.io.File; +import javax.jms.Connection; + import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.store.kahadb.KahaDBStore; public class IDERunner { @@ -29,8 +32,10 @@ public class IDERunner { public static void main(String[]args) throws Exception { BrokerService brokerService = new BrokerService(); - brokerService.addConnector( - "amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE + "&transport.transformer=" + AMQP_TRANSFORMER); + TransportConnector connector = brokerService.addConnector( + "amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE + + "&transport.transformer=" + AMQP_TRANSFORMER + + "&transport.wireFormat.maxAmqpFrameSize=104857600"); KahaDBStore store = new KahaDBStore(); store.setDirectory(new File("target/activemq-data/kahadb")); @@ -41,6 +46,10 @@ public class IDERunner { brokerService.deleteAllMessages(); brokerService.start(); + + Connection connection = JMSClientContext.INSTANCE.createConnection(connector.getPublishableConnectURI()); + connection.start(); + brokerService.waitUntilStopped(); } }
