publishing axis2 service in all servers in cluster
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/0b19e7e4 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/0b19e7e4 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/0b19e7e4 Branch: refs/heads/ODE-563 Commit: 0b19e7e457a582fa1ab862e620143fb3b7cfbf0d Parents: 71f3d35 Author: suba <[email protected]> Authored: Fri Jun 12 12:37:18 2015 +0530 Committer: suba <[email protected]> Committed: Fri Jun 12 12:37:18 2015 +0530 ---------------------------------------------------------------------- Rakefile | 2 +- .../ode/store/ClusterProcessStoreImpl.java | 34 +++++++++++++++++--- .../hazelcast/HazelcastClusterImpl.java | 34 ++------------------ 3 files changed, 32 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/0b19e7e4/Rakefile ---------------------------------------------------------------------- diff --git a/Rakefile b/Rakefile index 5fa4a07..06dcbd5 100644 --- a/Rakefile +++ b/Rakefile @@ -271,7 +271,7 @@ define "ode" do define "bpel-store" do compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas", "bpel-epr", "dao-hibernate", "dao-jpa", "clustering", "utils"), - JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J + JAVAX.persistence, JAVAX.stream, JAVAX.transaction, HIBERNATE, HSQLDB, XMLBEANS, XERCES, WSDL4J, OPENJPA, SPRING, SLF4J, LOG4J,HAZELCAST compile { open_jpa_enhance } resources hibernate_doclet(:package=>"org.apache.ode.store.hib", :excludedtags=>"@version,@author,@todo") http://git-wip-us.apache.org/repos/asf/ode/blob/0b19e7e4/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index c6f81ba..f364fb7 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -31,14 +31,22 @@ import java.util.ArrayList; import java.util.List; import java.util.Collection; +import com.hazelcast.core.*; + public class ClusterProcessStoreImpl extends ProcessStoreImpl{ private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class); - private HazelcastClusterImpl _hazelcastClusterImpl; + private HazelcastInstance _hazelcastInstance; + private Member deployInitiator; + private ITopic<String> clusterMessageTopic; public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, HazelcastClusterImpl hazelcastClusterImpl) { - super(); - _hazelcastClusterImpl = hazelcastClusterImpl; + super(eprContext,ds,persistenceType,props,createDatamodel); + _hazelcastInstance = hazelcastClusterImpl.getHazelcastInstance(); + + // Register for listening to message listener + clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg"); + clusterMessageTopic.addMessageListener(new ClusterMessageListener()); } public Collection<QName> deploy(final File deploymentUnitDirectory) { @@ -48,8 +56,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ } public void publishProcessStoreDeployedEvent(String duName){ - String returnedDuName = _hazelcastClusterImpl.publishProcessStoreEvent("Deployed " +duName); - publishService(returnedDuName); + deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); + clusterMessageTopic.publish("Deployed " +duName); } public void publishService(final String duName) { @@ -77,4 +85,20 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ } } + class ClusterMessageListener implements MessageListener<String> { + @Override + public void onMessage(Message<String> msg) { + String message = msg.getMessageObject(); + String arr[] = message.split(" ", 2); + String duName = arr[1]; + if(message.contains("Deployed ")) { + if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { + __log.info("Receive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +" for " +duName); + publishService(duName); + } + } + } + } + + } http://git-wip-us.apache.org/repos/asf/ode/blob/0b19e7e4/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 0e53de4..4e6878e 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -36,12 +36,9 @@ public class HazelcastClusterImpl implements HazelcastCluster{ private HazelcastInstance _hazelcastInstance; private boolean isMaster = false; - private String _duName = ""; private Member leader; - private Member deployInitiator; private IMap<String, String> lock_map; - private ITopic<String> clusterMessageTopic; public HazelcastClusterImpl(HazelcastInstance hazelcastInstance) { _hazelcastInstance = hazelcastInstance; @@ -52,10 +49,6 @@ public class HazelcastClusterImpl implements HazelcastCluster{ // Registering this node in the cluster. _hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener()); - // Register for listening to message listener - clusterMessageTopic = _hazelcastInstance.getTopic("deployedMsg"); - clusterMessageTopic.addMessageListener(new ClusterMessageListener()); - Member localMember = _hazelcastInstance.getCluster().getLocalMember(); String localMemberID = getHazelCastNodeID(localMember); __log.info("Registering HZ localMember ID " + localMemberID); @@ -113,22 +106,6 @@ public class HazelcastClusterImpl implements HazelcastCluster{ } } - class ClusterMessageListener implements MessageListener<String> { - @Override - public void onMessage(Message<String> msg) { - String message = msg.getMessageObject(); - String arr[] = message.split(" ", 2); - String duName = arr[1]; - if(message.contains("Deployed ")) { - if(_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { - setDUName(duName); - __log.info("Recerive deployment msg to " +_hazelcastInstance.getCluster().getLocalMember() +"for" +duName); - } - } - } - } - - public void markAsMaster() { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember()) { @@ -149,14 +126,7 @@ public class HazelcastClusterImpl implements HazelcastCluster{ return isMaster; } - public void setDUName(String duName) { - _duName = duName; + public HazelcastInstance getHazelcastInstance() { + return _hazelcastInstance; } - - public String publishProcessStoreEvent(String msg) { - deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); - clusterMessageTopic.publish(msg); - return _duName; - } - }
