completed version-1 of deploying processes in the cluster
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/9cb75820 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/9cb75820 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/9cb75820 Branch: refs/heads/master Commit: 9cb75820df9f8a87ebb163e4c1c45d7affd6291f Parents: 521d640 Author: suba <[email protected]> Authored: Wed Jun 17 12:32:40 2015 +0530 Committer: suba <[email protected]> Committed: Wed Jun 17 12:32:40 2015 +0530 ---------------------------------------------------------------------- .../webapp/WEB-INF/conf/ode-axis2.properties | 8 +++- .../java/org/apache/ode/axis2/Messages.java | 4 ++ .../java/org/apache/ode/axis2/ODEServer.java | 27 ++++++------ .../ode/axis2/deploy/DeploymentPoller.java | 14 +++---- .../ode/axis2/service/DeploymentWebService.java | 41 +++++++++++++------ .../apache/ode/bpel/clapi/ClusterManager.java | 13 +----- .../bpel/clapi/ProcessStoreUndeployedEvent.java | 40 ++++++++++++++++++ .../ode/store/ClusterProcessStoreImpl.java | 43 ++++++++++++++++---- .../org/apache/ode/store/ProcessStoreImpl.java | 22 ++++++++++ .../hazelcast/HazelcastClusterImpl.java | 37 +++++++++++------ 10 files changed, 183 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties ---------------------------------------------------------------------- diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties index 253037c..03ac79c 100644 --- a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties +++ b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties @@ -94,4 +94,10 @@ ode-axis2.db.emb.name=derby-jpadb ## Event listeners #ode-axis2.event.listeners= -#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener \ No newline at end of file +#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener + +## Enable clustering +#ode-axis2.clustering.enabled=true + +## Clustering Implementation class. +#ode-axis2.clustering.impl.class = org.apache.ode.clustering.hazelcast.HazelcastClusterImpl http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/Messages.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/Messages.java b/axis2/src/main/java/org/apache/ode/axis2/Messages.java index a95c30d..0581c72 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/Messages.java +++ b/axis2/src/main/java/org/apache/ode/axis2/Messages.java @@ -58,6 +58,10 @@ public class Messages extends MessageBundle { return format("Starting ODE ServiceEngine."); } + public String msgOdeClusteringNotInitialized() { + return format("Clustering has not been initialized."); + } + public String msgOdeStarted() { return format("ODE Service Engine has been started."); } http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java index f0ad470..51f05dd 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java @@ -57,6 +57,7 @@ import org.apache.ode.axis2.deploy.DeploymentPoller; import org.apache.ode.axis2.service.DeploymentWebService; import org.apache.ode.axis2.service.ManagementService; import org.apache.ode.axis2.util.ClusterUrlTransformer; +import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.connector.BpelServerConnector; import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; import org.apache.ode.bpel.engine.BpelServerImpl; @@ -82,8 +83,6 @@ import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.utils.GUID; import org.apache.ode.utils.fs.TempFileManager; -import org.apache.ode.bpel.clapi.ClusterManager; - /** * Server class called by our Axis hooks to handle all ODE lifecycle management. * @@ -122,6 +121,8 @@ public class ODEServer { protected Database _db; + protected ClusterManager _clusterManager; + private DeploymentPoller _poller; private BpelServerConnector _connector; @@ -135,10 +136,6 @@ public class ODEServer { public Runnable txMgrCreatedCallback; - private ClusterManager _clusterManager; - - private String clusteringState = ""; - private boolean isClusteringEnabled; public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException { @@ -193,9 +190,10 @@ public class ODEServer { txMgrCreatedCallback.run(); } - clusteringState = _odeConfig.getClusteringState(); - if (isClusteringEnabled()) initClustering(); - else __log.info("Clustering has not been initialized"); + String clusteringState = _odeConfig.getClusteringState(); + if (clusteringState != null && isClusteringEnabled(clusteringState)) { + initClustering(); + } else __log.info(__msgs.msgOdeClusteringNotInitialized()); __log.debug("Creating data source."); initDataSource(); @@ -384,6 +382,11 @@ public class ODEServer { _txMgr = null; } + if (_clusterManager != null) { + __log.debug("shutting down cluster manager."); + _clusterManager = null; + } + if (_connector != null) { try { __log.debug("shutdown BpelConnector"); @@ -468,7 +471,7 @@ public class ODEServer { } } - public boolean isClusteringEnabled() { + private boolean isClusteringEnabled(String clusteringState) { boolean state; if (clusteringState.equals("true")) state = true; else state = false; @@ -476,7 +479,7 @@ public class ODEServer { return state; } - public void setClustering (boolean state) { + private void setClustering (boolean state) { isClusteringEnabled = state; } @@ -493,7 +496,7 @@ public class ODEServer { Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName); _clusterManager = (ClusterManager) clusterImplClass.newInstance(); } catch (Exception ex) { - __log.error(ex); + __log.error("Error while loading class : " +clusterImplName ,ex); } _clusterManager.init(_configRoot); } http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java index ccb029b..9964af0 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java +++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java @@ -41,6 +41,7 @@ package org.apache.ode.axis2.deploy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.axis2.ODEServer; +import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.engine.cron.CronScheduler; import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig; import org.apache.ode.utils.WatchDog; @@ -54,8 +55,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.ode.bpel.clapi.ClusterManager; - /** * Polls a directory for the deployment of a new deployment unit. */ @@ -140,10 +139,9 @@ public class DeploymentPoller { // Checking for new deployment directories if (isDeploymentFromODEFileSystemAllowed() && files != null) { for (File file : files) { - String test = file.getName(); - __log.info("Trying to access the lock for " + test); - __log.info("Test null key value " +test); - duLocked = pollerTryLock(test); + String duName = file.getName(); + __log.info("Trying to acquire the lock for " + duName); + duLocked = pollerTryLock(duName); if (duLocked) { try { @@ -343,7 +341,9 @@ public class DeploymentPoller { } } - //Implementation of IMap key Lock + /** + * Use to acquire the lock by poller + */ private boolean pollerTryLock(String key) { if(clusterEnabled) { return _odeServer.getBpelServer().getContexts().clusterManager.tryLock(key); http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java index 1951cf5..89c5a63 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java +++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java @@ -172,7 +172,7 @@ public class DeploymentWebService { _poller.hold(); File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion()); - __log.info("Trying to access the lock for " + dest.getName()); + __log.info("Trying to acquire the lock for deploying: " + dest.getName()); //lock on deployment unit directory name duLocked = lock(dest.getName()); @@ -218,7 +218,7 @@ public class DeploymentWebService { } sendResponse(factory, messageContext, "deployResponse", response); } finally { - __log.info("Trying to release the lock for " + dest.getName()); + __log.info("Trying to release the lock for deploying: " + dest.getName()); unlock(dest.getName()); } } @@ -243,20 +243,30 @@ public class DeploymentWebService { // Put the poller on hold to avoid undesired side effects _poller.hold(); - Collection<QName> undeployed = _store.undeploy(deploymentDir); + __log.info("Trying to acquire the lock for undeploying: " + deploymentDir.getName()); + duLocked = lock(deploymentDir.getName()); - File deployedMarker = new File(deploymentDir + ".deployed"); - boolean isDeleted = deployedMarker.delete(); + if (duLocked) { + try { + Collection<QName> undeployed = _store.undeploy(deploymentDir); - if (!isDeleted) - __log.error("Error while deleting file " + deployedMarker.getName()); + File deployedMarker = new File(deploymentDir + ".deployed"); + boolean isDeleted = deployedMarker.delete(); - FileUtils.deepDelete(deploymentDir); + if (!isDeleted) + __log.error("Error while deleting file " + deployedMarker.getName()); - OMElement response = factory.createOMElement("response", null); - response.setText("" + (undeployed.size() > 0)); - sendResponse(factory, messageContext, "undeployResponse", response); - _poller.markAsUndeployed(deploymentDir); + FileUtils.deepDelete(deploymentDir); + + OMElement response = factory.createOMElement("response", null); + response.setText("" + (undeployed.size() > 0)); + sendResponse(factory, messageContext, "undeployResponse", response); + _poller.markAsUndeployed(deploymentDir); + } finally { + __log.info("Trying to release the lock for undeploying: " + deploymentDir.getName()); + unlock(deploymentDir.getName()); + } + } } finally { _poller.release(); } @@ -371,7 +381,9 @@ public class DeploymentWebService { out.close(); } - //Implementation of IMap key Lock + /** + * Acquire the lock when deploying using web service + */ private boolean lock(String key) { if(clusterEnabled) { return _odeServer.getBpelServer().getContexts().clusterManager.lock(key); @@ -379,6 +391,9 @@ public class DeploymentWebService { else return true; } + /** + * Release the lock after completing deploy process + */ private boolean unlock(String key) { if(clusterEnabled) { return _odeServer.getBpelServer().getContexts().clusterManager.unlock(key); http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index a1fe194..df4342e 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -30,12 +30,7 @@ public interface ClusterManager { void init(File file); /** - * Check whether current node is the leader or not. - */ - void markAsMaster(); - - /** - * Return isMaster + * Return whether the local member is Master or not * @return */ boolean getIsMaster(); @@ -72,10 +67,4 @@ public interface ClusterManager { * @param event */ void publishProcessStoreEvent(Object event); - - /** - * Handle event according to received event - * @param message - */ - void handleEvent(Object message); } http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java new file mode 100644 index 0000000..347312f --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.bpel.clapi; + +import java.io.Serializable; + +public class ProcessStoreUndeployedEvent implements Serializable { + private static final long serialVersionUID = 1L; + + public final String deploymentUnit; + + public final String info; + + public ProcessStoreUndeployedEvent(String deploymentUnit) { + this.info = "Undeployment Event"; + this.deploymentUnit = deploymentUnit; + } + + @Override + public String toString() { + return "{ProcessStoreUndeployedEvent#" + deploymentUnit +"}"; + } + +} http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java index 6f35110..551fd72 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent; +import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent; import org.apache.ode.bpel.iapi.ProcessState; import org.apache.ode.bpel.iapi.EndpointReferenceContext; import org.apache.ode.il.config.OdeConfigProperties; @@ -36,9 +37,10 @@ import java.util.regex.Pattern; public class ClusterProcessStoreImpl extends ProcessStoreImpl{ private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class); - private final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>(); + private final Map<QName, ProcessConfImpl> loaded = new HashMap<QName, ProcessConfImpl>(); private ClusterManager _clusterManager; private ProcessStoreDeployedEvent deployedEvent; + private ProcessStoreUndeployedEvent undeployedEvent; public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) { super(eprContext,ds,persistenceType,props,createDatamodel); @@ -49,8 +51,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ public Collection<QName> deploy(final File deploymentUnitDirectory) { Collection<QName> deployed = super.deploy(deploymentUnitDirectory); Map<QName, ProcessConfImpl> _processes = getProcessesMap(); - for (QName key :_processes.keySet()) { - if(!loaded.contains(_processes.get(key))) loaded.add(_processes.get(key)); + for (QName key : deployed) { + loaded.put(key,_processes.get(key)); } publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName()); return deployed; @@ -67,8 +69,8 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ Pattern duNamePattern = getPreviousPackageVersionPattern(duName); - for (Iterator<ProcessConfImpl> iterator = loaded.iterator(); iterator.hasNext();) { - ProcessConfImpl pconf = iterator.next(); + for (QName key : loaded.keySet()) { + ProcessConfImpl pconf = loaded.get(key); Matcher matcher = duNamePattern.matcher(pconf.getPackage()); if (matcher.matches() && pconf.getState().equals(state)) { pconf.setState(ProcessState.RETIRED); @@ -82,9 +84,10 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName); if (dudao != null) { List<ProcessConfImpl> load = load(dudao); - loaded.addAll(load); + for(ProcessConfImpl p : load) { + loaded.put(p.getProcessId(),p); + } confs.addAll(load); - } return null; } @@ -97,10 +100,9 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ try { fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName()); } catch (Exception except) { - __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except); + __log.error("Error with process retiring or activating : pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except); } } - //loadAll(); } private Pattern getPreviousPackageVersionPattern(String duName) { @@ -116,4 +118,27 @@ public class ClusterProcessStoreImpl extends ProcessStoreImpl{ Pattern duNamePattern = Pattern.compile(duNameRegExp.toString()); return duNamePattern; } + + public Collection<QName> undeploy(final File dir) { + Collection<QName> undeployed = super.undeploy(dir); + loaded.keySet().removeAll(undeployed); + publishProcessStoreUndeployedEvent(dir.getName()); + return undeployed; + } + + private void publishProcessStoreUndeployedEvent(String duName){ + undeployedEvent = new ProcessStoreUndeployedEvent(duName); + _clusterManager.publishProcessStoreEvent(undeployedEvent); + } + + /** + * Use to unregister processes when deployment unit is undeployed + * @param duName + * @return + */ + public Collection<QName> undeployProcesses(final String duName) { + Collection<QName> undeployed = super.undeployProcesses(duName); + loaded.keySet().removeAll(undeployed); + return undeployed; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java index d6f76f3..77afe5a 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java @@ -903,4 +903,26 @@ public class ProcessStoreImpl implements ProcessStore { protected Map<QName, ProcessConfImpl> getProcessesMap() { return _processes; } + + protected Collection<QName> undeployProcesses(final String duName) { + Collection<QName> undeployed = Collections.emptyList(); + DeploymentUnitDir du; + _rw.writeLock().lock(); + try { + du = _deploymentUnits.remove(duName); + if (du != null) { + undeployed = toPids(du.getProcessNames(), du.getVersion()); + } + + for (QName pn : undeployed) { + fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName())); + __log.info(__msgs.msgProcessUndeployed(pn)); + } + + _processes.keySet().removeAll(undeployed); + } finally { + _rw.writeLock().unlock(); + } + return undeployed; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/9cb75820/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 2e6868f..beba779 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -29,9 +29,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.store.ClusterProcessStoreImpl; import org.apache.ode.bpel.clapi.ClusterManager; import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent; +import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent; +import org.apache.ode.store.ClusterProcessStoreImpl; /** * This class implements necessary methods to build the cluster using hazelcast @@ -42,14 +43,16 @@ public class HazelcastClusterImpl implements ClusterManager { private HazelcastInstance _hazelcastInstance; private boolean isMaster = false; private Member leader; - private Member deployInitiator; + private Member eventInitiator; private IMap<String, String> lock_map; private ITopic<Object> clusterMessageTopic; private ClusterProcessStoreImpl _clusterProcessStore; public void init(File configRoot) { + /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path. Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/ + String hzConfig = System.getProperty("hazelcast.config"); if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance(); else { @@ -79,7 +82,7 @@ public class HazelcastClusterImpl implements ClusterManager { } public boolean lock(String key) { - lock_map.putIfAbsent(key,key); + lock_map.putIfAbsent(key, key); lock_map.lock(key); boolean state = lock_map.isLocked(key); __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state); @@ -98,9 +101,9 @@ public class HazelcastClusterImpl implements ClusterManager { } public boolean tryLock(String key) { - lock_map.putIfAbsent(key,key); + lock_map.putIfAbsent(key, key); boolean state = lock_map.tryLock(key); - __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state ); + __log.info("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state); return state; } @@ -121,9 +124,9 @@ public class HazelcastClusterImpl implements ClusterManager { } } - public void publishProcessStoreEvent(Object deployedEvent) { - deployInitiator = _hazelcastInstance.getCluster().getLocalMember(); - clusterMessageTopic.publish(deployedEvent); + public void publishProcessStoreEvent(Object event) { + eventInitiator = _hazelcastInstance.getCluster().getLocalMember(); + clusterMessageTopic.publish(event); } @@ -134,20 +137,30 @@ public class HazelcastClusterImpl implements ClusterManager { } } - public void handleEvent(Object message) { + private void handleEvent(Object message) { if (message instanceof ProcessStoreDeployedEvent) { ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message; - if (_hazelcastInstance.getCluster().getLocalMember() != deployInitiator) { + if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) { String duName = event.deploymentUnit; __log.info("Receive deployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName); _clusterProcessStore.publishService(duName); - } else deployInitiator = null; + } else eventInitiator = null; + } + + else if (message instanceof ProcessStoreUndeployedEvent) { + ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message; + + if (_hazelcastInstance.getCluster().getLocalMember() != eventInitiator) { + String duName = event.deploymentUnit; + __log.info("Receive undeployment msg to " + _hazelcastInstance.getCluster().getLocalMember() + " for " + duName); + _clusterProcessStore.undeployProcesses(duName); + } else eventInitiator = null; } } - public void markAsMaster() { + private void markAsMaster() { leader = _hazelcastInstance.getCluster().getMembers().iterator().next(); if (leader.localMember()) { isMaster = true;
