Updated Branches: refs/heads/trunk dc0291b29 -> 0f9a34799
https://issues.apache.org/jira/browse/AMQ-4753 Quick fix for getting past protocol discrimination and passing on proper Buffers of data to the protocol converter. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f9a3479 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f9a3479 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f9a3479 Branch: refs/heads/trunk Commit: 0f9a34799630911b0d7906e166291ff5af0d852c Parents: dc0291b Author: Timothy Bish <[email protected]> Authored: Tue Oct 8 17:53:51 2013 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 8 17:53:51 2013 -0400 ---------------------------------------------------------------------- .../transport/amqp/AMQPSslTransportFactory.java | 41 +++++----- .../transport/amqp/AmqpNioSslTransport.java | 32 ++++++-- .../transport/amqp/AmqpTestSupport.java | 21 +++++- .../transport/amqp/bugs/AMQ4753Test.java | 78 ++++++++++++++++++++ 4 files changed, 145 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java index bd341a5..0612fd9 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.amqp; +import java.util.HashMap; +import java.util.Map; + import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -25,9 +28,6 @@ import org.apache.activemq.transport.tcp.SslTransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; -import java.util.HashMap; -import java.util.Map; - /** * A <a href="http://amqp.org/">AMQP</a> over SSL transport factory */ @@ -35,12 +35,13 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok private BrokerContext brokerContext = null; + @Override protected String getDefaultWireFormatType() { return "amqp"; } + @Override @SuppressWarnings("rawtypes") - public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new AmqpTransportFilter(transport, format, brokerContext); IntrospectionSupport.setProperties(transport, options); @@ -53,31 +54,33 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok transport = super.serverConfigure(transport, format, options); // strip off the mutex transport. - if( transport instanceof MutexTransport ) { - transport = ((MutexTransport)transport).getNext(); + if (transport instanceof MutexTransport) { + transport = ((MutexTransport) transport).getNext(); } -// MutexTransport mutex = transport.narrow(MutexTransport.class); -// if (mutex != null) { -// mutex.setSyncOnCommand(true); -// } + // MutexTransport mutex = transport.narrow(MutexTransport.class); + // if (mutex != null) { + // mutex.setSyncOnCommand(true); + // } return transport; } + @Override public void setBrokerService(BrokerService brokerService) { this.brokerContext = brokerService.getBrokerContext(); } -// protected Transport createInactivityMonitor(Transport transport, WireFormat format) { -// AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); -// -// AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); -// filter.setInactivityMonitor(monitor); -// -// return monitor; -// } - + // protected Transport createInactivityMonitor(Transport transport, + // WireFormat format) { + // AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, + // format); + // + // AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class); + // filter.setInactivityMonitor(monitor); + // + // return monitor; + // } @Override protected boolean isUseInactivityMonitor(Transport transport) { http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index 9109244..d393aa7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -16,18 +16,22 @@ */ package org.apache.activemq.transport.amqp; -import org.apache.activemq.transport.nio.NIOSSLTransport; -import org.apache.activemq.wireformat.WireFormat; - -import javax.net.SocketFactory; import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.Buffer; + public class AmqpNioSslTransport extends NIOSSLTransport { + private boolean magicRead; + public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); } @@ -46,7 +50,23 @@ public class AmqpNioSslTransport extends NIOSSLTransport { @Override protected void processCommand(ByteBuffer plain) throws Exception { - doConsume(AmqpSupport.toBuffer(plain)); - } + byte[] fill = new byte[plain.remaining()]; + plain.get(fill); + + ByteBuffer payload = ByteBuffer.wrap(fill); + + if (!magicRead) { + if (payload.remaining() >= 8) { + magicRead = true; + Buffer magic = new Buffer(8); + for (int i = 0; i < 8; i++) { + magic.data[i] = payload.get(); + } + doConsume(new AmqpHeader(magic)); + } + } + + doConsume(AmqpSupport.toBuffer(payload)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 2284a8c..5c5bcff 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; +import java.security.SecureRandom; import java.util.Vector; import javax.jms.Connection; @@ -27,6 +28,9 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; @@ -45,10 +49,11 @@ public class AmqpTestSupport { protected BrokerService brokerService; protected Vector<Throwable> exceptions = new Vector<Throwable>(); protected int numberOfMessages; - AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() { - }; + AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {}; protected int port; protected int sslPort; + protected int nioPort; + protected int nioPlusSslPort; public static void main(String[] args) throws Exception { final AmqpTestSupport s = new AmqpTestSupport(); @@ -72,6 +77,10 @@ public class AmqpTestSupport { brokerService.setPersistent(false); brokerService.setAdvisorySupport(false); + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); + SSLContext.setDefault(ctx); + // Setup SSL context... final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile()); File keystore = new File(classesDir, "../../src/test/resources/keystore"); @@ -91,8 +100,16 @@ public class AmqpTestSupport { protected void addAMQPConnector() throws Exception { TransportConnector connector = brokerService.addConnector("amqp+ssl://0.0.0.0:" + sslPort); sslPort = connector.getConnectUri().getPort(); + LOG.debug("Using amqp+ssl port " + sslPort); connector = brokerService.addConnector("amqp://0.0.0.0:" + port); port = connector.getConnectUri().getPort(); + LOG.debug("Using amqp port " + port); + connector = brokerService.addConnector("amqp+nio://0.0.0.0:" + nioPort); + nioPort = connector.getConnectUri().getPort(); + LOG.debug("Using amqp+nio port " + nioPort); + connector = brokerService.addConnector("amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort); + nioPlusSslPort = connector.getConnectUri().getPort(); + LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort); } @After http://git-wip-us.apache.org/repos/asf/activemq/blob/0f9a3479/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java new file mode 100644 index 0000000..3be3482 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.transport.amqp.AmqpTestSupport; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.junit.Test; + +public class AMQ4753Test extends AmqpTestSupport { + + @Test(timeout = 120 * 1000) + public void testAmqpNioPlusSslSendReceive() throws JMSException{ + Connection connection = createAMQPConnection(nioPlusSslPort, true); + runSimpleSendReceiveTest(connection); + } + + public void runSimpleSendReceiveTest(Connection connection) throws JMSException{ + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueImpl queue = new QueueImpl("queue://txqueue"); + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage(); + String messageText = "hello sent at " + new java.util.Date().toString(); + message.setText(messageText); + producer.send(message); + + // Get the message we just sent + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message receivedMessage = consumer.receive(5000); + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + TextMessage textMessage = (TextMessage) receivedMessage; + assertEquals(messageText, textMessage.getText()); + connection.close(); + } + + private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException { + LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL); + final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", testPort, "admin", "password", null, useSSL); + final Connection connection = connectionFactory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + connection.start(); + return connection; + } +}
