Author: rajdavies
Date: Thu Dec 22 02:28:04 2005
New Revision: 358551
URL: http://svn.apache.org/viewcvs?rev=358551&view=rev
Log:
Revert back to javax.jms.Destination instead of Strings and added support
for a DestinationMarshaller
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,15 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
package org.activecluster;
import java.io.Serializable;
@@ -46,7 +48,7 @@
*
* @return the destination to send messages to all members of the cluster
*/
- public String getDestination();
+ public Destination getDestination();
/**
* A snapshot of the nodes in the cluster indexed by the Destination
@@ -94,18 +96,10 @@
* @param message the message to be sent
* @throws JMSException
*/
- public void send(String destination, Message message) throws JMSException;
+ public void send(Destination destination, Message message) throws
JMSException;
/**
- * Utility method for sending back replies in message exchanges
- *
- * @param replyTo the replyTo JMS Destination on a Message
- * @param message the message to be sent
- * @throws JMSException
- */
- public void send(Destination replyTo, Message message) throws JMSException;
- /**
* Creates a consumer of all the messags sent to the given destination,
* including messages sent via the send() messages
*
@@ -113,7 +107,7 @@
* @return a newly created message consumer
* @throws JMSException
*/
- public MessageConsumer createConsumer(String destination) throws
JMSException;
+ public MessageConsumer createConsumer(Destination destination) throws
JMSException;
/**
* Creates a consumer of all message sent to the given destination,
@@ -125,7 +119,7 @@
* @return a newly created message consumer
* @throws JMSException
*/
- public MessageConsumer createConsumer(String destination, String selector)
throws JMSException;
+ public MessageConsumer createConsumer(Destination destination, String
selector) throws JMSException;
/**
* Creates a consumer of all message sent to the given destination,
@@ -139,7 +133,7 @@
* @return a newly created message consumer
* @throws JMSException
*/
- public MessageConsumer createConsumer(String destination, String selector,
boolean noLocal) throws JMSException;
+ public MessageConsumer createConsumer(Destination destination, String
selector, boolean noLocal) throws JMSException;
// Message factory methods
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster;
+import javax.jms.Destination;
import javax.jms.JMSException;
@@ -30,19 +32,62 @@
/**
* Creates a new cluster connection using the given local name and
destination name
* @param localName
- * @param destination
+ * @param destinationName
+ *
+ * @return Cluster
+ * @throws JMSException
+ */
+ public Cluster createCluster(String localName,String destinationName)
throws JMSException;
+
+ /**
+ * Creates a new cluster connection using the given local name and
destination name
+ * @param localName
+ * @param destinationName
+ * @param marshaller
+ *
+ * @return Cluster
+ * @throws JMSException
+ */
+ public Cluster createCluster(String localName,String
destinationName,DestinationMarshaller marshaller) throws JMSException;
+
+
+
+ /**
+ * Creates a new cluster connection - generating the localName
automatically
+ * @param destinationName
+ * @return the Cluster
+ * @throws JMSException
+ */
+ public Cluster createCluster(String destinationName) throws JMSException;
+
+ /**
+ * Creates a new cluster connection using the given local name and
destination name
+ * @param localName
+ * @param destination
*
* @return Cluster
* @throws JMSException
*/
- public Cluster createCluster(String localName,String destination) throws
JMSException;
+ public Cluster createCluster(String localName,Destination destination)
throws JMSException;
+
+ /**
+ * Creates a new cluster connection using the given local name and
destination name
+ * @param localName
+ * @param destination
+ * @param marshaller
+ *
+ * @return Cluster
+ * @throws JMSException
+ */
+ public Cluster createCluster(String localName,Destination destination,
DestinationMarshaller marshaller) throws JMSException;
+
/**
* Creates a new cluster connection - generating the localName
automatically
* @param destination
- * @return
+ * @return the Cluster
* @throws JMSException
*/
- public Cluster createCluster(String destination) throws JMSException;
+ public Cluster createCluster(Destination destination) throws JMSException;
}
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
(original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
Thu Dec 22 02:28:04 2005
@@ -7,17 +7,18 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster;
-import java.io.Serializable;
import java.util.Map;
+import javax.jms.Destination;
/**
@@ -25,14 +26,14 @@
*
* @version $Revision: 1.3 $
*/
-public interface Node extends Serializable {
+public interface Node {
/**
* Access to the queue to send messages direct to this node.
*
* @return the destination to send messages to this node while its
available
*/
- public String getDestination();
+ public Destination getDestination();
/**
* @return an immutable map of the nodes state
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import java.io.Serializable;
@@ -33,6 +34,7 @@
import javax.jms.TextMessage;
import org.activecluster.Cluster;
import org.activecluster.ClusterListener;
+import org.activecluster.DestinationMarshaller;
import org.activecluster.LocalNode;
import org.activecluster.Service;
import org.activecluster.election.ElectionStrategy;
@@ -50,41 +52,53 @@
private StateServiceImpl stateService;
private LocalNode localNode;
- private String destination;
+ private Destination destination;
private Connection connection;
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
private Timer timer;
+ private DestinationMarshaller marshaller;
private AtomicBoolean started = new AtomicBoolean(false);
private Object clusterLock = new Object();
- public DefaultCluster(final LocalNode localNode,String dataDestination,
String destination, Connection connection, Session session,
- MessageProducer producer, Timer timer, long
inactiveTime) throws JMSException {
- this.localNode = localNode;
- this.destination = destination;
- this.connection = connection;
- this.session = session;
- this.producer = producer;
- this.timer = timer;
-
- if (producer == null) {
+ /**
+ * Construct this beast
+ * @param localNode
+ * @param dataDestination
+ * @param destination
+ * @param marshaller
+ * @param connection
+ * @param session
+ * @param producer
+ * @param timer
+ * @param inactiveTime
+ * @throws JMSException
+ */
+ public DefaultCluster(final LocalNode localNode,Destination
dataDestination,Destination destination,
+ DestinationMarshaller marshaller,Connection
connection,Session session,MessageProducer producer,
+ Timer timer,long inactiveTime) throws JMSException{
+ this.localNode=localNode;
+ this.destination=destination;
+ this.marshaller=marshaller;
+ this.connection=connection;
+ this.session=session;
+ this.producer=producer;
+ this.timer=timer;
+ if(producer==null){
throw new IllegalArgumentException("No producer specified!");
}
-
// now lets subscribe the service to the updates from the data topic
- consumer = session.createConsumer(createDestination(dataDestination),
null, true);
-
- log.info("Creating data consumer on topic: " + dataDestination);
-
- this.stateService = new StateServiceImpl(this, clusterLock, new
Runnable() {
- public void run() {
- if (localNode instanceof ReplicatedLocalNode) {
+ consumer=session.createConsumer(dataDestination,null,true);
+ log.info("Creating data consumer on topic: "+dataDestination);
+ this.stateService=new StateServiceImpl(this,clusterLock,new Runnable(){
+ public void run(){
+ if(localNode instanceof ReplicatedLocalNode){
((ReplicatedLocalNode) localNode).pingRemoteNodes();
}
}
- }, timer, inactiveTime);
- consumer.setMessageListener(new StateConsumer(stateService));
+ },timer,inactiveTime);
+ consumer.setMessageListener(new
StateConsumer(stateService,marshaller));
}
public void addClusterListener(ClusterListener listener) {
@@ -95,7 +109,7 @@
stateService.removeClusterListener(listener);
}
- public String getDestination() {
+ public Destination getDestination() {
return destination;
}
@@ -111,24 +125,21 @@
stateService.setElectionStrategy(strategy);
}
- public void send(String destination,Message message) throws JMSException {
- producer.send(createDestination(destination), message);
- }
-
+
public void send(Destination replyTo, Message message) throws JMSException{
producer.send(replyTo,message);
}
- public MessageConsumer createConsumer(String destination) throws
JMSException {
- return getSession().createConsumer(createDestination(destination));
+ public MessageConsumer createConsumer(Destination destination) throws
JMSException {
+ return getSession().createConsumer(destination);
}
- public MessageConsumer createConsumer(String destination, String selector)
throws JMSException {
- return getSession().createConsumer(createDestination(destination),
selector);
+ public MessageConsumer createConsumer(Destination destination, String
selector) throws JMSException {
+ return getSession().createConsumer(destination, selector);
}
- public MessageConsumer createConsumer(String destination, String selector,
boolean noLocal) throws JMSException {
- return getSession().createConsumer(createDestination(destination),
selector, noLocal);
+ public MessageConsumer createConsumer(Destination destination, String
selector, boolean noLocal) throws JMSException {
+ return getSession().createConsumer(destination, selector, noLocal);
}
public Message createMessage() throws JMSException {
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
Thu Dec 22 02:28:04 2005
@@ -20,6 +20,7 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -27,6 +28,7 @@
import org.activecluster.Cluster;
import org.activecluster.ClusterException;
import org.activecluster.ClusterFactory;
+import org.activecluster.DestinationMarshaller;
import org.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,16 +63,42 @@
this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE,
"ACTIVECLUSTER.DATA.", 6000L);
}
- public Cluster createCluster(String groupDestination) throws JMSException {
+ public Cluster createCluster(Destination groupDestination) throws
JMSException {
return createCluster(idGenerator.generateId(), groupDestination);
}
- public Cluster createCluster(String name,String groupDestination) throws
JMSException {
+ public Cluster createCluster(String name,Destination groupDestination)
throws JMSException {
Connection connection = getConnectionFactory().createConnection();
Session session = createSession(connection);
- return createCluster(connection, session, name,groupDestination);
+ return createCluster(connection, session, name,groupDestination,new
DefaultDestinationMarshaller());
+ }
+
+ public Cluster createCluster(String name,Destination
groupDestination,DestinationMarshaller marshaller) throws JMSException {
+ Connection connection = getConnectionFactory().createConnection();
+ Session session = createSession(connection);
+ return createCluster(connection, session,
name,groupDestination,marshaller);
+ }
+
+
+ public Cluster createCluster(String name,String groupDestinationName)
throws JMSException{
+ Connection connection = getConnectionFactory().createConnection();
+ Session session = createSession(connection);
+ return createCluster(connection, session,
name,session.createTopic(groupDestinationName),new
DefaultDestinationMarshaller());
+ }
+
+ public Cluster createCluster(String name,String
groupDestinationName,DestinationMarshaller marshaller) throws JMSException{
+ Connection connection = getConnectionFactory().createConnection();
+ Session session = createSession(connection);
+ return createCluster(connection, session,
name,session.createTopic(groupDestinationName),marshaller);
}
+
+
+ public Cluster createCluster(String groupDestinationName) throws
JMSException{
+ return createCluster(idGenerator.generateId(), groupDestinationName);
+ }
+
+
// Properties
//-------------------------------------------------------------------------
public String getDataTopicPrefix() {
@@ -134,34 +162,29 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected Cluster createCluster(Connection connection, Session session,
String name,String groupDestination) throws JMSException {
- String dataDestination = dataTopicPrefix + groupDestination;
-
- log.info("Creating cluster group producer on topic: " +
groupDestination);
-
- MessageProducer producer = createProducer(session, null);
+ protected Cluster createCluster(Connection connection,Session
session,String name,Destination groupDestination,
+ DestinationMarshaller marshaller) throws JMSException{
+ String dataDestination=dataTopicPrefix+groupDestination;
+ log.info("Creating cluster group producer on topic:
"+groupDestination);
+ MessageProducer producer=createProducer(session,null);
producer.setDeliveryMode(deliveryMode);
-
- log.info("Creating cluster data producer on data destination: " +
dataDestination);
-
- Topic dataTopic = session.createTopic(dataDestination);
- MessageProducer keepAliveProducer = session.createProducer(dataTopic);
+ log.info("Creating cluster data producer on data destination:
"+dataDestination);
+ Topic dataTopic=session.createTopic(dataDestination);
+ MessageProducer keepAliveProducer=session.createProducer(dataTopic);
keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- StateService serviceStub = new StateServiceStub(session,
keepAliveProducer);
-
- String localInboxDestination = dataDestination + "." + name;
-
- ReplicatedLocalNode localNode = new
ReplicatedLocalNode(name,localInboxDestination, serviceStub);
- Timer timer = new Timer();
- DefaultCluster answer = new DefaultCluster(localNode, dataDestination,
groupDestination, connection, session, producer, timer, inactiveTime);
+ StateService serviceStub=new
StateServiceStub(session,keepAliveProducer,marshaller);
+ Destination
localInboxDestination=session.createTopic(dataDestination+"."+name);
+ ReplicatedLocalNode localNode=new
ReplicatedLocalNode(name,localInboxDestination,serviceStub);
+ Timer timer=new Timer();
+ DefaultCluster answer=new
DefaultCluster(localNode,dataTopic,groupDestination,marshaller,connection,session,
+ producer,timer,inactiveTime);
return answer;
}
/*
- protected Cluster createInternalCluster(Session session, Topic
dataDestination) {
- MessageProducer producer = createProducer(session);
- return new DefaultCluster(new NonReplicatedLocalNode(),
dataDestination, connection, session, producer);
- }
+ * protected Cluster createInternalCluster(Session session, Topic
dataDestination) { MessageProducer producer =
+ * createProducer(session); return new DefaultCluster(new
NonReplicatedLocalNode(), dataDestination, connection,
+ * session, producer); }
*/
protected MessageProducer createProducer(Session session, Topic
groupDestination) throws JMSException {
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
Thu Dec 22 02:28:04 2005
@@ -7,17 +7,23 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
+import javax.jms.Destination;
+import org.activecluster.DestinationMarshaller;
import org.activecluster.Node;
@@ -26,13 +32,22 @@
*
* @version $Revision: 1.3 $
*/
-public class NodeImpl implements Node {
+public class NodeImpl implements Node{
private static final long serialVersionUID=-3909792803360045064L;
private String name;
- private String destination;
+ private Destination destination;
protected Map state;
protected boolean coordinator;
-
+
+
+ /**
+ * Construct an Node from a NodeState
+ * @param nodeState
+ * @param marshaller
+ */
+ public NodeImpl(NodeState nodeState,DestinationMarshaller marshaller){
+
this(nodeState.getName(),marshaller.getDestination(nodeState.getDestinationName()),nodeState.getState());
+ }
/**
* Allow a node to be copied for sending it as a message
*
@@ -47,7 +62,7 @@
* @param name
* @param destination
*/
- public NodeImpl(String name,String destination) {
+ public NodeImpl(String name,Destination destination) {
this(name,destination, new HashMap());
}
@@ -57,7 +72,7 @@
* @param destination
* @param state
*/
- public NodeImpl(String name,String destination, Map state) {
+ public NodeImpl(String name,Destination destination, Map state) {
this.name = name;
this.destination = destination;
this.state = state;
@@ -80,7 +95,7 @@
/**
* @return the destination of the node
*/
- public String getDestination() {
+ public Destination getDestination() {
return destination;
}
@@ -117,5 +132,15 @@
protected void setCoordinator(boolean value) {
coordinator = value;
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException{
+ // TODO Auto-generated method stub
+
+ }
+
+ public void readExternal(ObjectInput in) throws
IOException,ClassNotFoundException{
+ // TODO Auto-generated method stub
+
}
}
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import java.util.Map;
+import javax.jms.Destination;
import org.activecluster.LocalNode;
/**
@@ -33,7 +35,7 @@
* @param name
* @param destination
*/
- public NonReplicatedLocalNode(String name, String destination) {
+ public NonReplicatedLocalNode(String name, Destination destination) {
super(name,destination);
}
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import java.util.Map;
+import javax.jms.Destination;
import javax.jms.JMSException;
import org.activecluster.LocalNode;
import org.activecluster.Service;
@@ -34,7 +36,7 @@
*
*/
private static final long serialVersionUID=4626381612145333540L;
- private StateService serviceStub;
+ private transient StateService serviceStub;
/**
* Create ReplicatedLocalNode
@@ -42,7 +44,7 @@
* @param destination
* @param serviceStub
*/
- public ReplicatedLocalNode(String name,String destination, StateService
serviceStub) {
+ public ReplicatedLocalNode(String name,Destination destination,
StateService serviceStub) {
super(name,destination);
this.serviceStub = serviceStub;
}
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.activecluster.DestinationMarshaller;
import org.activecluster.Node;
import javax.jms.Message;
@@ -36,12 +38,14 @@
private final static Log log = LogFactory.getLog(StateConsumer.class);
private StateService stateService;
+ private DestinationMarshaller marshaller;
- public StateConsumer(StateService stateService) {
+ public StateConsumer(StateService stateService,DestinationMarshaller
marshaller) {
if (stateService == null) {
throw new IllegalArgumentException("Must specify a valid
StateService implementation");
}
this.stateService = stateService;
+ this.marshaller = marshaller;
}
public void onMessage(Message message) {
@@ -52,7 +56,8 @@
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
try {
- Node node = (Node) objectMessage.getObject();
+ NodeState nodeState = (NodeState) objectMessage.getObject();
+ Node node = new NodeImpl(nodeState,marshaller);
String type = objectMessage.getJMSType();
if (type != null && type.equals("shutdown")) {
stateService.shutdown(node);
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import java.util.HashMap;
@@ -23,6 +24,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
+import javax.jms.Destination;
import javax.jms.JMSException;
import org.activecluster.Cluster;
import org.activecluster.ClusterEvent;
@@ -49,7 +51,7 @@
private Map nodes = new ConcurrentHashMap();
private long inactiveTime;
private List listeners = new CopyOnWriteArrayList();
- private String localDestination;
+ private Destination localDestination;
private Runnable localNodePing;
private NodeImpl coordinator;
private ElectionStrategy electionStrategy;
@@ -133,7 +135,7 @@
* @param node
*/
public void keepAlive(Node node) {
- String key = node.getDestination();
+ Object key = node.getDestination();
if (key != null && !localDestination.equals(key)) {
NodeEntry entry = (NodeEntry) nodes.get(key);
if (entry == null) {
@@ -163,7 +165,7 @@
* shutdown the node
*/
public void shutdown(Node node){
- String key=node.getDestination();
+ Object key=node.getDestination();
if(key!=null){
nodes.remove(key);
ClusterEvent event=new
ClusterEvent(cluster,node,ClusterEvent.ADD_NODE);
Modified:
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
(original)
+++
incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.activecluster.DestinationMarshaller;
import org.activecluster.Node;
import javax.jms.JMSException;
@@ -38,10 +40,12 @@
private Session session;
private MessageProducer producer;
+ private DestinationMarshaller marshaller;
- public StateServiceStub(Session session, MessageProducer producer) {
+ public StateServiceStub(Session session, MessageProducer
producer,DestinationMarshaller marshaller) {
this.session = session;
this.producer = producer;
+ this.marshaller = marshaller;
}
public void keepAlive(Node node) {
@@ -50,7 +54,7 @@
log.debug("Sending cluster data message: " + node);
}
- Message message = session.createObjectMessage(new NodeImpl(node));
+ Message message = session.createObjectMessage(new
NodeState(node,marshaller));
producer.send(message);
}
catch (JMSException e) {
@@ -64,7 +68,7 @@
log.debug("Sending shutdown message: " + node);
}
- Message message = session.createObjectMessage(new NodeImpl(node));
+ Message message = session.createObjectMessage(new
NodeState(node,marshaller));
message.setJMSType("shutdown");
producer.send(message);
}
Modified:
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
(original)
+++
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
Thu Dec 22 02:28:04 2005
@@ -2,17 +2,18 @@
*
* Copyright 2004 The Apache Software Foundation
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.activecluster;
@@ -150,7 +151,7 @@
try {
System.out.println("request received");
ObjectMessage om = cluster.createObjectMessage();
-
om.setJMSReplyTo(cluster.createDestination(cluster.getLocalNode().getDestination()));
+ om.setJMSReplyTo(cluster.getLocalNode().getDestination());
om.setObject(new Response());
System.out.println("sending response");
cluster.send(om2.getJMSReplyTo(), om);
@@ -192,7 +193,7 @@
// 1->1 messages
_cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
ObjectMessage om = _cluster0.createObjectMessage();
-
om.setJMSReplyTo(_cluster0.createDestination(_cluster0.getLocalNode().getDestination()));
+ om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
om.setObject(new Request());
testResponsePassed = false;
_cluster0.send(_cluster0.getLocalNode().getDestination(), om);
Modified:
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
(original)
+++
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster;
import java.util.List;
import java.util.Map;
+import javax.jms.Destination;
import javax.jms.Message;
/**
@@ -27,14 +29,14 @@
protected int count = 2;
- public void xtestCluster() throws Exception {
+ public void testCluster() throws Exception {
cluster = createCluster();
subscribeToCluster();
cluster.start();
- String destination = cluster.getDestination();
+ Destination destination = cluster.getDestination();
Message message = cluster.createTextMessage("abcdef");
cluster.send(destination, message);
Modified:
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
(original)
+++
incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster;
import org.activecluster.impl.ActiveMQClusterFactory;
@@ -49,13 +50,13 @@
protected void subscribeToCluster() throws Exception {
// listen to cluster messages
- String clusterDestination = cluster.getDestination();
+ Destination clusterDestination = cluster.getDestination();
assertTrue("Local destination must not be null", clusterDestination !=
null);
clusterConsumer = cluster.createConsumer(clusterDestination);
clusterConsumer.setMessageListener(clusterListener);
// listen to inbox messages (individual messages)
- String localDestination = cluster.getLocalNode().getDestination();
+ Destination localDestination = cluster.getLocalNode().getDestination();
assertTrue("Local destination must not be null", localDestination !=
null);
System.out.println("Consuming from local destination: " +
localDestination);
Modified:
incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
(original)
+++
incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster;
import junit.framework.TestCase;
@@ -51,7 +52,7 @@
}
protected Cluster createCluster(String name) throws JMSException,
ClusterException {
- Cluster cluster = createCluster();
- return cluster;
+ ClusterFactory factory = new ActiveMQClusterFactory();
+ return
factory.createCluster(name,"ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
}
}
Modified:
incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
---
incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java
(original)
+++
incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java
Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activecluster.group;
import java.util.HashMap;
@@ -22,7 +23,9 @@
import org.activecluster.Cluster;
import org.activecluster.ClusterEvent;
import org.activecluster.ClusterListener;
+import org.activecluster.DestinationMarshaller;
import org.activecluster.Node;
+import org.activecluster.impl.DefaultDestinationMarshaller;
import org.activecluster.impl.NodeImpl;
/**
@@ -36,6 +39,7 @@
private ClusterListener listener;
private Cluster cluster;
private Map nodes = new HashMap();
+ private DestinationMarshaller marshaller = new
DefaultDestinationMarshaller();
protected void addNodes(String[] nodeNames) {
for (int i = 0; i < nodeNames.length; i++) {
@@ -45,7 +49,8 @@
}
protected void addNode(String nodeName) {
- Node node = new NodeImpl(nodeName,nodeName);
+
+ Node node = new NodeImpl(nodeName,marshaller.getDestination(nodeName));
nodes.put(nodeName, node);
listener.onNodeAdd(new ClusterEvent(cluster, node,
ClusterEvent.ADD_NODE));
}