completed version-2 of deploying processes in the cluster
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/4137714c Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/4137714c Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/4137714c Branch: refs/heads/master Commit: 4137714cf38dffd9b7d391bf5e2944e84ccb14d4 Parents: 0afb7c4 Author: suba <[email protected]> Authored: Tue Jun 23 09:56:39 2015 +0530 Committer: suba <[email protected]> Committed: Tue Jun 23 09:56:39 2015 +0530 ---------------------------------------------------------------------- .../apache/ode/bpel/clapi/ClusterManager.java | 4 +-- .../ode/store/ClusterProcessStoreImpl.java | 4 +-- .../hazelcast/HazelcastClusterImpl.java | 27 ++++++++------------ 3 files changed, 15 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/4137714c/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index fab2bac..da2d668 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -64,9 +64,9 @@ public interface ClusterManager { /** * Publish Deploy event to the cluster by deploy initiator - * @param event + * @param clusterEvent */ - void publishProcessStoreEvent(Object event); + void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent); /** * Check whether the map has a value for given key, if absent put the value to map http://git-wip-us.apache.org/repos/asf/ode/blob/4137714c/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index 117eab0..f0ae9d1 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -55,7 +55,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ private void publishProcessStoreDeployedEvent(String duName){ deployedEvent = new ProcessStoreDeployedEvent(duName); - _clusterManager.publishProcessStoreEvent(deployedEvent); + _clusterManager.publishProcessStoreClusterEvent(deployedEvent); } public void publishService(final String duName) { @@ -111,7 +111,7 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ private void publishProcessStoreUndeployedEvent(String duName){ undeployedEvent = new ProcessStoreUndeployedEvent(duName); - _clusterManager.publishProcessStoreEvent(undeployedEvent); + _clusterManager.publishProcessStoreClusterEvent(undeployedEvent); } /** http://git-wip-us.apache.org/repos/asf/ode/blob/4137714c/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 42a3169..36859eb 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -29,10 +29,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.clapi.ClusterManager; -import org.apache.ode.bpel.clapi.ProcessStoreClusterEvent; -import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent; -import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent; +import org.apache.ode.bpel.clapi.*; import org.apache.ode.store.ClusterProcessStoreImpl; /** @@ -45,7 +42,7 @@ public class HazelcastClusterImpl implements ClusterManager { private boolean isMaster = false; private Member leader; private IMap<String, String> lock_map; - private ITopic<Object> clusterMessageTopic; + private ITopic<ProcessStoreClusterEvent> clusterMessageTopic; private ClusterProcessStoreImpl _clusterProcessStore; public void init(File configRoot) { @@ -120,27 +117,25 @@ public class HazelcastClusterImpl implements ClusterManager { } } - public void publishProcessStoreEvent(Object event) { - if (event instanceof ProcessStoreClusterEvent) { - ProcessStoreClusterEvent e = (ProcessStoreClusterEvent) event; - e.setUuid(_hazelcastInstance.getCluster().getLocalMember().getUuid()); - clusterMessageTopic.publish(e); - } + public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) { + clusterEvent.setUuid(_hazelcastInstance.getCluster().getLocalMember().getUuid()); + __log.info("UUID " +clusterEvent.getUuid()); + clusterMessageTopic.publish(clusterEvent); } - class ClusterMessageListener implements MessageListener<Object> { + class ClusterMessageListener implements MessageListener<ProcessStoreClusterEvent> { @Override - public void onMessage(Message<Object> msg) { + public void onMessage(Message<ProcessStoreClusterEvent> msg) { handleEvent(msg.getMessageObject()); } } - private void handleEvent(Object message) { + private void handleEvent(ProcessStoreClusterEvent message) { if (message instanceof ProcessStoreDeployedEvent) { ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message; - if (_hazelcastInstance.getCluster().getLocalMember().getUuid() != event.getUuid()) { + if (!_hazelcastInstance.getCluster().getLocalMember().getUuid().equals(event.getUuid())) { String duName = event.getDuName(); __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName); _clusterProcessStore.publishService(duName); @@ -150,7 +145,7 @@ public class HazelcastClusterImpl implements ClusterManager { else if (message instanceof ProcessStoreUndeployedEvent) { ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message; - if (_hazelcastInstance.getCluster().getLocalMember().getUuid() != event.getUuid()) { + if (!_hazelcastInstance.getCluster().getLocalMember().getUuid().equals(event.getUuid())) { String duName = event.getDuName(); __log.info("Receive undeployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName); _clusterProcessStore.undeployProcesses(duName);
