Added openwire parameters as bean properties so that it can be passed via the new protocol manager bean util.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bdb27a9e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bdb27a9e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bdb27a9e Branch: refs/heads/refactor-openwire Commit: bdb27a9ef1fa8c7091b46aac5a296d3eaedddabc Parents: 3ab1359 Author: Howard Gao <howard....@gmail.com> Authored: Wed Feb 17 20:50:33 2016 +0800 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Feb 25 18:10:23 2016 -0500 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 25 ++----------- .../openwire/OpenWireProtocolManager.java | 38 +++++++++++++++++--- .../artemiswrapper/OpenwireArtemisBaseTest.java | 20 ++++++++--- .../transport/failover/FailoverClusterTest.java | 11 ++++-- .../failover/FailoverComplexClusterTest.java | 16 ++++++--- .../failover/FailoverPriorityTest.java | 22 ++++++++---- .../failover/FailoverUpdateURIsTest.java | 6 +++- 7 files changed, 94 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 61e93cb..a6f0f34 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -55,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; @@ -153,10 +152,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S private String defaultSocketURIString; - private boolean rebalance; - private boolean updateClusterClients; - private boolean updateClusterClientsOnRemove; - public OpenWireConnection(Acceptor acceptorUsed, Connection connection, OpenWireProtocolManager openWireProtocolManager, @@ -167,12 +162,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S this.wireFormat = wf; this.creationTime = System.currentTimeMillis(); this.defaultSocketURIString = connection.getLocalAddress(); - - //Clebert: These are parameters specific to openwire cluster with defaults as specified at - //http://activemq.apache.org/failover-transport-reference.html - rebalance = ConfigurationHelper.getBooleanProperty("rebalance-cluster-clients", true, acceptorUsed.getConfiguration()); - updateClusterClients = ConfigurationHelper.getBooleanProperty("update-cluster-clients", true, acceptorUsed.getConfiguration()); - updateClusterClientsOnRemove = ConfigurationHelper.getBooleanProperty("update-cluster-clients-on-remove", true, acceptorUsed.getConfiguration()); } @Override @@ -200,10 +189,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S return info.getPassword(); } - public boolean isRebalance() { - return rebalance; - } - private ConnectionInfo getConnectionInfo() { if (state == null) { return null; @@ -539,9 +524,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S Response resp = new ExceptionResponse(e); return resp; } - if (info.isManageable() && this.isUpdateClusterClients()) { + if (info.isManageable() && protocolManager.isUpdateClusterClients()) { // send ConnectionCommand - ConnectionControl command = protocolManager.newConnectionControl(rebalance); + ConnectionControl command = protocolManager.newConnectionControl(); command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration()); if (info.isFailoverReconnect()) { command.setRebalanceConnection(false); @@ -1274,16 +1259,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S public void updateClient(ConnectionControl control) { // if (!destroyed && context.isFaultTolerant()) { - if (updateClusterClients) { + if (protocolManager.isUpdateClusterClients()) { dispatchAsync(control); } // } } - public boolean isUpdateClusterClients() { - return updateClusterClients; - } - public AMQConnectionContext initContext(ConnectionInfo info) { WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo(); // Older clients should have been defaulting this field to true.. but http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 525844c..bd26b07 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -148,6 +148,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No private final ScheduledExecutorService scheduledPool; + //bean properties + //http://activemq.apache.org/failover-transport-reference.html + private boolean rebalanceClusterClients = false; + private boolean updateClusterClients = false; + private boolean updateClusterClientsOnRemove = false; + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -189,7 +195,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No } for (OpenWireConnection c : this.connections) { - ConnectionControl control = newConnectionControl(c.isRebalance()); + ConnectionControl control = newConnectionControl(); c.updateClient(control); } } @@ -422,13 +428,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No return brokerName; } - protected ConnectionControl newConnectionControl(boolean rebalance) { + protected ConnectionControl newConnectionControl() { ConnectionControl control = new ConnectionControl(); - String uri = generateMembersURI(rebalance); + String uri = generateMembersURI(rebalanceClusterClients); control.setConnectedBrokers(uri); - control.setRebalanceConnection(rebalance); + control.setRebalanceConnection(rebalanceClusterClients); return control; } @@ -814,4 +820,28 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No brokerInfo.setPeerBrokerInfos(null); connection.dispatchAsync(brokerInfo); } + + public void setRebalanceClusterClients(boolean rebalance) { + this.rebalanceClusterClients = rebalance; + } + + public boolean isRebalanceClusterClients() { + return this.rebalanceClusterClients; + } + + public void setUpdateClusterClients(boolean updateClusterClients) { + this.updateClusterClients = updateClusterClients; + } + + public boolean isUpdateClusterClients() { + return this.updateClusterClients; + } + + public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { + this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; + } + + public boolean isUpdateClusterClientsOnRemove() { + return this.updateClusterClientsOnRemove; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java index be9cf06..5c8d3b6 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.broker.artemiswrapper; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; @@ -31,6 +33,8 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.activemq.artemis.utils.uri.URISupport; import org.apache.activemq.broker.BrokerService; import org.junit.Assert; import org.junit.Rule; @@ -88,7 +92,7 @@ public class OpenwireArtemisBaseTest { public String CLUSTER_PASSWORD = "OPENWIRECLUSTER"; protected Configuration createConfig(final int serverID) throws Exception { - return createConfig("localhost", serverID); + return createConfig("localhost", serverID, Collections.EMPTY_MAP); } protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception { @@ -111,6 +115,10 @@ public class OpenwireArtemisBaseTest { } protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception { + return createConfig(hostAddress, serverID, Collections.EMPTY_MAP); + } + + protected Configuration createConfig(final String hostAddress, final int serverID, Map<String, String> params) throws Exception { ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false). setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO). setJournalDirectory(getJournalDir(serverID, false)). @@ -123,7 +131,7 @@ public class OpenwireArtemisBaseTest { configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true)); - configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID)); + configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID) + "?" + URISupport.createQueryString(params)); configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID)); return configuration; @@ -171,8 +179,12 @@ public class OpenwireArtemisBaseTest { return "tcp://" + localhostAddress + ":" + (61616 + serverID); } - protected static String newURIwithPort(String localhostAddress, int port) { - return "tcp://" + localhostAddress + ":" + port; + protected static String newURIwithPort(String localhostAddress, int port) throws Exception { + return newURIwithPort(localhostAddress, port, Collections.EMPTY_MAP); + } + + protected static String newURIwithPort(String localhostAddress, int port, Map<String, String> params) throws Exception { + return "tcp://" + localhostAddress + ":" + port + "?" + URISupport.createQueryString(params); } public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index bf43caa..74fa6aa 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -22,8 +22,10 @@ import javax.jms.Queue; import javax.jms.Session; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -50,8 +52,13 @@ public class FailoverClusterTest extends OpenwireArtemisBaseTest { @Before public void setUp() throws Exception { - Configuration config1 = createConfig(1); - Configuration config2 = createConfig(2); + Map<String, String> params = new HashMap<String, String>(); + + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + + Configuration config1 = createConfig("localhost", 1, params); + Configuration config2 = createConfig("localhost", 2, params); deployClusterConfiguration(config1, 2); deployClusterConfiguration(config2, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java index 1d902e3..fd9ce1f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java @@ -65,9 +65,15 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest { //default setup for most tests private void commonSetup() throws Exception { - Configuration config0 = createConfig(0); - Configuration config1 = createConfig(1); - Configuration config2 = createConfig(2); + Map<String, String> params = new HashMap<String, String>(); + + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + params.put("updateClusterClientsOnRemove", "true"); + + Configuration config0 = createConfig("localhost", 0, params); + Configuration config1 = createConfig("localhost", 1, params); + Configuration config2 = createConfig("localhost", 2, params); deployClusterConfiguration(config0, 1, 2); deployClusterConfiguration(config1, 0, 2); @@ -248,9 +254,9 @@ public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest { @Test public void testFailOverWithUpdateClientsOnRemove() throws Exception { // Broker A - Configuration config0 = createConfig(0, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); + Configuration config0 = createConfig(0, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true"); // Broker B - Configuration config1 = createConfig(1, "?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true"); + Configuration config1 = createConfig(1, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true"); deployClusterConfiguration(config0, 1); deployClusterConfiguration(config1, 0); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index 6e559e7..6f4b27e 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnection; @@ -49,11 +50,16 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>(); private EmbeddedJMS[] servers = new EmbeddedJMS[3]; private String clientUrl; + private Map<String, String> params = new HashMap<String, String>(); @Before public void setUp() throws Exception { urls.put(0, BROKER_A_CLIENT_TC_ADDRESS); urls.put(1, BROKER_B_CLIENT_TC_ADDRESS); + params.clear(); + params.put("rebalanceClusterClients", "true"); + params.put("updateClusterClients", "true"); + params.put("updateClusterClientsOnRemove", "true"); } @After @@ -136,7 +142,7 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { @Test public void testThreeBrokers() throws Exception { - commonSetup(); + setupThreeBrokers(); Thread.sleep(1000); setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); @@ -262,11 +268,15 @@ public class FailoverPriorityTest extends OpenwireArtemisBaseTest { } } - //default setup for most tests - private void commonSetup() throws Exception { - Configuration config0 = createConfig("127.0.0.1", 0); - Configuration config1 = createConfig("127.0.0.1", 1); - Configuration config2 = createConfig("127.0.0.1", 2); + private void setupThreeBrokers() throws Exception { + + params.put("rebalanceClusterClients", "false"); + params.put("updateClusterClients", "false"); + params.put("updateClusterClientsOnRemove", "false"); + + Configuration config0 = createConfig("127.0.0.1", 0, params); + Configuration config1 = createConfig("127.0.0.1", 1, params); + Configuration config2 = createConfig("127.0.0.1", 2, params); deployClusterConfiguration(config0, 1, 2); deployClusterConfiguration(config1, 0, 2); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java index 002a788..0a127dd 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover; import java.io.File; import java.io.FileOutputStream; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -117,7 +119,9 @@ public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest { @Test public void testAutoUpdateURIs() throws Exception { - Configuration config0 = createConfig(0); + Map<String, String> params = new HashMap<String, String>(); + params.put("updateClusterClients", "true"); + Configuration config0 = createConfig("localhost", 0, params); deployClusterConfiguration(config0, 10); server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); server0.start();