Updated Branches: refs/heads/trunk a116960f3 -> 190a44bf2
Implements AMQ-5050: Populate a 'Host' header in the WireFormatInfo of the Openwire protocol to let multi-tenant proxies route connections. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/190a44bf Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/190a44bf Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/190a44bf Branch: refs/heads/trunk Commit: 190a44bf25a7484e3a0c6178890c953824fbdf3c Parents: a116960 Author: Hiram Chirino <[email protected]> Authored: Thu Feb 13 14:13:29 2014 -0500 Committer: Hiram Chirino <[email protected]> Committed: Thu Feb 13 14:15:20 2014 -0500 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 4 + .../activemq/broker/OpenwireConnectionTest.java | 82 ++++++++++++++++++++ .../apache/activemq/command/WireFormatInfo.java | 13 ++++ .../openwire/OpenWireFormatFactory.java | 12 +++ .../activemq/transport/TransportFactory.java | 3 + 5 files changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/190a44bf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index f67d7f1..74b8dc4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1578,4 +1578,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } return result; } + + public WireFormatInfo getRemoteWireFormatInfo() { + return wireFormatInfo; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/190a44bf/activemq-broker/src/test/java/org/apache/activemq/broker/OpenwireConnectionTest.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/OpenwireConnectionTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/OpenwireConnectionTest.java new file mode 100644 index 0000000..130c80a --- /dev/null +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/OpenwireConnectionTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import java.net.URI; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + */ +public class OpenwireConnectionTest { + BrokerService broker; + URI brokerConnectURI; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + + TransportConnector connector = broker.addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:0")); + connector.setName("tcp"); + + broker.start(); + broker.waitUntilStarted(); + + brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testAMQ5050DefaultHost() throws Exception { + // Let verify a host header is added to the connection. + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.start(); + + CopyOnWriteArrayList<TransportConnection> connections = broker.getConnectorByName("tcp").getConnections(); + assertEquals(1, connections.size()); + assertNotNull(connections.get(0).getRemoteWireFormatInfo().getHost()); + connection.stop(); + } + + @Test + public void testAMQ5050WithManualSpecifiedHost() throws Exception { + // Let verify a host header is added to the connection. + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI+"?wireFormat.host=foo").createConnection(); + connection.start(); + + CopyOnWriteArrayList<TransportConnection> connections = broker.getConnectorByName("tcp").getConnections(); + assertEquals(1, connections.size()); + assertEquals("foo", connections.get(0).getRemoteWireFormatInfo().getHost()); + connection.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/190a44bf/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java index 8084660..8deaa76 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -29,6 +29,7 @@ import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.UTF8Buffer; /** * @openwire:marshaller code="1" @@ -250,6 +251,18 @@ public class WireFormatInfo implements Command, MarshallAware { setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE); } + public String getHost() throws IOException { + UTF8Buffer buff = (UTF8Buffer) getProperty("Host"); + if( buff == null ) { + return null; + } + return (String) buff.toString(); + } + + public void setHost(String hostname) throws IOException { + setProperty("Host", hostname); + } + /** * @throws IOException */ http://git-wip-us.apache.org/repos/asf/activemq/blob/190a44bf/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java index 7c9c8fe..b7f0e4f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java @@ -40,6 +40,7 @@ public class OpenWireFormatFactory implements WireFormatFactory { private long maxInactivityDurationInitalDelay = 10*1000; private int cacheSize = 1024; private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE; + private String host=null; public WireFormat createWireFormat() { WireFormatInfo info = new WireFormatInfo(); @@ -55,6 +56,9 @@ public class OpenWireFormatFactory implements WireFormatFactory { info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay); info.setCacheSize(cacheSize); info.setMaxFrameSize(maxFrameSize); + if( host!=null ) { + info.setHost(host); + } } catch (Exception e) { IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo"); ise.initCause(e); @@ -147,4 +151,12 @@ public class OpenWireFormatFactory implements WireFormatFactory { public void setMaxFrameSize(long maxFrameSize) { this.maxFrameSize = maxFrameSize; } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/190a44bf/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java index d68ffc4..321ee4f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java @@ -112,6 +112,9 @@ public abstract class TransportFactory { public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + if( !options.containsKey("wireFormat.host") ) { + options.put("wireFormat.host", location.getHost()); + } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf); Transport rc = configure(transport, wf, options);
