Added openwire parameters as bean properties so that it
can be passed via the new protocol manager bean util.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bdb27a9e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bdb27a9e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bdb27a9e

Branch: refs/heads/refactor-openwire
Commit: bdb27a9ef1fa8c7091b46aac5a296d3eaedddabc
Parents: 3ab1359
Author: Howard Gao <howard....@gmail.com>
Authored: Wed Feb 17 20:50:33 2016 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Feb 25 18:10:23 2016 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 25 ++-----------
 .../openwire/OpenWireProtocolManager.java       | 38 +++++++++++++++++---
 .../artemiswrapper/OpenwireArtemisBaseTest.java | 20 ++++++++---
 .../transport/failover/FailoverClusterTest.java | 11 ++++--
 .../failover/FailoverComplexClusterTest.java    | 16 ++++++---
 .../failover/FailoverPriorityTest.java          | 22 ++++++++----
 .../failover/FailoverUpdateURIsTest.java        |  6 +++-
 7 files changed, 94 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 61e93cb..a6f0f34 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -55,7 +55,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
-import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
@@ -153,10 +152,6 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor, S
 
    private String defaultSocketURIString;
 
-   private boolean rebalance;
-   private boolean updateClusterClients;
-   private boolean updateClusterClientsOnRemove;
-
    public OpenWireConnection(Acceptor acceptorUsed,
                              Connection connection,
                              OpenWireProtocolManager openWireProtocolManager,
@@ -167,12 +162,6 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor, S
       this.wireFormat = wf;
       this.creationTime = System.currentTimeMillis();
       this.defaultSocketURIString = connection.getLocalAddress();
-
-      //Clebert: These are parameters specific to openwire cluster with 
defaults as specified at
-      //http://activemq.apache.org/failover-transport-reference.html
-      rebalance = 
ConfigurationHelper.getBooleanProperty("rebalance-cluster-clients", true, 
acceptorUsed.getConfiguration());
-      updateClusterClients = 
ConfigurationHelper.getBooleanProperty("update-cluster-clients", true, 
acceptorUsed.getConfiguration());
-      updateClusterClientsOnRemove = 
ConfigurationHelper.getBooleanProperty("update-cluster-clients-on-remove", 
true, acceptorUsed.getConfiguration());
    }
 
    @Override
@@ -200,10 +189,6 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor, S
       return info.getPassword();
    }
 
-   public boolean isRebalance() {
-      return rebalance;
-   }
-
    private ConnectionInfo getConnectionInfo() {
       if (state == null) {
          return null;
@@ -539,9 +524,9 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor, S
          Response resp = new ExceptionResponse(e);
          return resp;
       }
-      if (info.isManageable() && this.isUpdateClusterClients()) {
+      if (info.isManageable() && protocolManager.isUpdateClusterClients()) {
          // send ConnectionCommand
-         ConnectionControl command = 
protocolManager.newConnectionControl(rebalance);
+         ConnectionControl command = protocolManager.newConnectionControl();
          
command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
          if (info.isFailoverReconnect()) {
             command.setRebalanceConnection(false);
@@ -1274,16 +1259,12 @@ public class OpenWireConnection implements 
RemotingConnection, CommandVisitor, S
 
    public void updateClient(ConnectionControl control) {
       //      if (!destroyed && context.isFaultTolerant()) {
-      if (updateClusterClients) {
+      if (protocolManager.isUpdateClusterClients()) {
          dispatchAsync(control);
       }
       //      }
    }
 
-   public boolean isUpdateClusterClients() {
-      return updateClusterClients;
-   }
-
    public AMQConnectionContext initContext(ConnectionInfo info) {
       WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
       // Older clients should have been defaulting this field to true.. but

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 525844c..bd26b07 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -148,6 +148,12 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, No
 
    private final ScheduledExecutorService scheduledPool;
 
+   //bean properties
+   //http://activemq.apache.org/failover-transport-reference.html
+   private boolean rebalanceClusterClients = false;
+   private boolean updateClusterClients = false;
+   private boolean updateClusterClientsOnRemove = false;
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, 
ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -189,7 +195,7 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, No
       }
 
       for (OpenWireConnection c : this.connections) {
-         ConnectionControl control = newConnectionControl(c.isRebalance());
+         ConnectionControl control = newConnectionControl();
          c.updateClient(control);
       }
    }
@@ -422,13 +428,13 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, No
       return brokerName;
    }
 
-   protected ConnectionControl newConnectionControl(boolean rebalance) {
+   protected ConnectionControl newConnectionControl() {
       ConnectionControl control = new ConnectionControl();
 
-      String uri = generateMembersURI(rebalance);
+      String uri = generateMembersURI(rebalanceClusterClients);
       control.setConnectedBrokers(uri);
 
-      control.setRebalanceConnection(rebalance);
+      control.setRebalanceConnection(rebalanceClusterClients);
       return control;
    }
 
@@ -814,4 +820,28 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, No
       brokerInfo.setPeerBrokerInfos(null);
       connection.dispatchAsync(brokerInfo);
    }
+
+   public void setRebalanceClusterClients(boolean rebalance) {
+      this.rebalanceClusterClients = rebalance;
+   }
+
+   public boolean isRebalanceClusterClients() {
+      return this.rebalanceClusterClients;
+   }
+
+   public void setUpdateClusterClients(boolean updateClusterClients) {
+      this.updateClusterClients = updateClusterClients;
+   }
+
+   public boolean isUpdateClusterClients() {
+      return this.updateClusterClients;
+   }
+
+   public  void setUpdateClusterClientsOnRemove(boolean 
updateClusterClientsOnRemove) {
+      this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
+   }
+
+   public boolean isUpdateClusterClientsOnRemove() {
+      return this.updateClusterClientsOnRemove;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
 
b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
index be9cf06..5c8d3b6 100644
--- 
a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
+++ 
b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -19,7 +19,9 @@ package org.apache.activemq.broker.artemiswrapper;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
@@ -31,6 +33,8 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.artemis.utils.uri.URISchema;
+import org.apache.activemq.artemis.utils.uri.URISupport;
 import org.apache.activemq.broker.BrokerService;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -88,7 +92,7 @@ public class OpenwireArtemisBaseTest {
    public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
 
    protected Configuration createConfig(final int serverID) throws Exception {
-      return createConfig("localhost", serverID);
+      return createConfig("localhost", serverID, Collections.EMPTY_MAP);
    }
 
    protected Configuration createConfig(final String hostAddress, final int 
serverID, final int port) throws Exception {
@@ -111,6 +115,10 @@ public class OpenwireArtemisBaseTest {
    }
 
    protected Configuration createConfig(final String hostAddress, final int 
serverID) throws Exception {
+      return createConfig(hostAddress, serverID, Collections.EMPTY_MAP);
+   }
+
+   protected Configuration createConfig(final String hostAddress, final int 
serverID, Map<String, String> params) throws Exception {
       ConfigurationImpl configuration = new 
ConfigurationImpl().setJMXManagementEnabled(false).
               
setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 
1024).setJournalType(JournalType.NIO).
               setJournalDirectory(getJournalDir(serverID, false)).
@@ -123,7 +131,7 @@ public class OpenwireArtemisBaseTest {
 
       configuration.addAddressesSetting("#", new 
AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
 
-      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, 
serverID));
+      configuration.addAcceptorConfiguration("netty", newURI(hostAddress, 
serverID) + "?" + URISupport.createQueryString(params));
       configuration.addConnectorConfiguration("netty-connector", 
newURI(hostAddress, serverID));
 
       return configuration;
@@ -171,8 +179,12 @@ public class OpenwireArtemisBaseTest {
       return "tcp://" + localhostAddress + ":" + (61616 + serverID);
    }
 
-   protected static String newURIwithPort(String localhostAddress, int port) {
-      return "tcp://" + localhostAddress + ":" + port;
+   protected static String newURIwithPort(String localhostAddress, int port) 
throws Exception {
+      return newURIwithPort(localhostAddress, port, Collections.EMPTY_MAP);
+   }
+
+   protected static String newURIwithPort(String localhostAddress, int port, 
Map<String, String> params) throws Exception {
+      return "tcp://" + localhostAddress + ":" + port + "?" + 
URISupport.createQueryString(params);
    }
 
    public static JMSServerControl createJMSServerControl(final MBeanServer 
mbeanServer) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index bf43caa..74fa6aa 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -22,8 +22,10 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -50,8 +52,13 @@ public class FailoverClusterTest extends 
OpenwireArtemisBaseTest {
 
    @Before
    public void setUp() throws Exception {
-      Configuration config1 = createConfig(1);
-      Configuration config2 = createConfig(2);
+      Map<String, String> params = new HashMap<String, String>();
+
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+
+      Configuration config1 = createConfig("localhost", 1, params);
+      Configuration config2 = createConfig("localhost", 2, params);
 
       deployClusterConfiguration(config1, 2);
       deployClusterConfiguration(config2, 1);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index 1d902e3..fd9ce1f 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -65,9 +65,15 @@ public class FailoverComplexClusterTest extends 
OpenwireArtemisBaseTest {
 
    //default setup for most tests
    private void commonSetup() throws Exception {
-      Configuration config0 = createConfig(0);
-      Configuration config1 = createConfig(1);
-      Configuration config2 = createConfig(2);
+      Map<String, String> params = new HashMap<String, String>();
+
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+      params.put("updateClusterClientsOnRemove", "true");
+
+      Configuration config0 = createConfig("localhost", 0, params);
+      Configuration config1 = createConfig("localhost", 1, params);
+      Configuration config2 = createConfig("localhost", 2, params);
 
       deployClusterConfiguration(config0, 1, 2);
       deployClusterConfiguration(config1, 0, 2);
@@ -248,9 +254,9 @@ public class FailoverComplexClusterTest extends 
OpenwireArtemisBaseTest {
    @Test
    public void testFailOverWithUpdateClientsOnRemove() throws Exception {
       // Broker A
-      Configuration config0 = createConfig(0, 
"?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
+      Configuration config0 = createConfig(0, 
"?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
       // Broker B
-      Configuration config1 = createConfig(1, 
"?rebalance-cluster-client=true&update-cluster-clients=true&update-cluster-clients-on-remove=true");
+      Configuration config1 = createConfig(1, 
"?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
 
       deployClusterConfiguration(config0, 1);
       deployClusterConfiguration(config1, 0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index 6e559e7..6f4b27e 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnection;
@@ -49,11 +50,16 @@ public class FailoverPriorityTest extends 
OpenwireArtemisBaseTest {
    private final List<ActiveMQConnection> connections = new 
ArrayList<ActiveMQConnection>();
    private EmbeddedJMS[] servers = new EmbeddedJMS[3];
    private String clientUrl;
+   private Map<String, String> params = new HashMap<String, String>();
 
    @Before
    public void setUp() throws Exception {
       urls.put(0, BROKER_A_CLIENT_TC_ADDRESS);
       urls.put(1, BROKER_B_CLIENT_TC_ADDRESS);
+      params.clear();
+      params.put("rebalanceClusterClients", "true");
+      params.put("updateClusterClients", "true");
+      params.put("updateClusterClientsOnRemove", "true");
    }
 
    @After
@@ -136,7 +142,7 @@ public class FailoverPriorityTest extends 
OpenwireArtemisBaseTest {
 
    @Test
    public void testThreeBrokers() throws Exception {
-      commonSetup();
+      setupThreeBrokers();
       Thread.sleep(1000);
 
       setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + 
BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + 
")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
@@ -262,11 +268,15 @@ public class FailoverPriorityTest extends 
OpenwireArtemisBaseTest {
       }
    }
 
-   //default setup for most tests
-   private void commonSetup() throws Exception {
-      Configuration config0 = createConfig("127.0.0.1", 0);
-      Configuration config1 = createConfig("127.0.0.1", 1);
-      Configuration config2 = createConfig("127.0.0.1", 2);
+   private void setupThreeBrokers() throws Exception {
+
+      params.put("rebalanceClusterClients", "false");
+      params.put("updateClusterClients", "false");
+      params.put("updateClusterClientsOnRemove", "false");
+
+      Configuration config0 = createConfig("127.0.0.1", 0, params);
+      Configuration config1 = createConfig("127.0.0.1", 1, params);
+      Configuration config2 = createConfig("127.0.0.1", 2, params);
 
       deployClusterConfiguration(config0, 1, 2);
       deployClusterConfiguration(config1, 0, 2);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bdb27a9e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index 002a788..0a127dd 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -117,7 +119,9 @@ public class FailoverUpdateURIsTest extends 
OpenwireArtemisBaseTest {
 
    @Test
    public void testAutoUpdateURIs() throws Exception {
-      Configuration config0 = createConfig(0);
+      Map<String, String> params = new HashMap<String, String>();
+      params.put("updateClusterClients", "true");
+      Configuration config0 = createConfig("localhost", 0, params);
       deployClusterConfiguration(config0, 10);
       server0 = new 
EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new 
JMSConfigurationImpl());
       server0.start();

Reply via email to