cluster enabled instance lock manager implementation-1
Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/cfa4a97b Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/cfa4a97b Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/cfa4a97b Branch: refs/heads/master Commit: cfa4a97b0a0ec854790a16bcedcc5279afa6656a Parents: 09a4486 Author: suba <[email protected]> Authored: Thu Jul 9 23:43:55 2015 +0530 Committer: suba <[email protected]> Committed: Thu Jul 9 23:43:55 2015 +0530 ---------------------------------------------------------------------- .../ode/bpel/AbstractInstanceLockManager.java | 38 ------------------- .../org/apache/ode/bpel/clapi/ClusterLock.java | 4 +- .../apache/ode/bpel/clapi/ClusterManager.java | 8 +++- .../bpel/iapi/AbstractInstanceLockManager.java | 37 +++++++++++++++++++ .../apache/ode/bpel/engine/BpelEngineImpl.java | 24 ++++++++++-- .../ode/bpel/engine/InstanceLockManager.java | 2 +- .../hazelcast/HazelcastClusterImpl.java | 3 +- .../hazelcast/HazelcastDeploymentLock.java | 1 + .../hazelcast/HazelcastInstanceLock.java | 39 ++++++++++---------- 9 files changed, 90 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-api/src/main/java/org/apache/ode/bpel/AbstractInstanceLockManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/AbstractInstanceLockManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/AbstractInstanceLockManager.java deleted file mode 100644 index 7a34b2c..0000000 --- a/bpel-api/src/main/java/org/apache/ode/bpel/AbstractInstanceLockManager.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ode.bpel; - -import java.util.concurrent.TimeUnit; - -/** - * Abstract class to implement an instance lock manager. Instance lock provide process instance isolation from - * concurrent access when entering jacob - */ -public abstract class AbstractInstanceLockManager { - abstract public void unlock(Long iid); - - abstract public void lock(Long iid, int i, TimeUnit microseconds) throws InterruptedException, - TimeoutException; - - /** Exception class indicating a time-out occured while obtaining a lock. */ - public static final class TimeoutException extends Exception { - private static final long serialVersionUID = 7247629086692580285L; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java index 9eaf705..118b275 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java @@ -38,14 +38,14 @@ public interface ClusterLock { boolean unlockMap(String key); /** - * Tries to acquire the lock for the specified key ant time period. + * Tries to acquire the lock for the specified key * @param key * @return */ boolean tryLockMap(String key); /** - * + * Tries to acquire the lock for the specified key and time period. * @param key * @param time * @param tu http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java index cbfb12f..d73810d 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java @@ -18,7 +18,7 @@ */ package org.apache.ode.bpel.clapi; -import org.apache.ode.bpel.AbstractInstanceLockManager; +import org.apache.ode.bpel.iapi.AbstractInstanceLockManager; import java.io.File; @@ -58,7 +58,13 @@ public interface ClusterManager { */ void registerClusterProcessStoreMessageListener(); + /** + * Return deployment lock for cluster + */ ClusterLock getDeploymentLock(); + /** + * Return instance lock for cluster + */ AbstractInstanceLockManager getInstanceLock(); } http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/AbstractInstanceLockManager.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/AbstractInstanceLockManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/AbstractInstanceLockManager.java new file mode 100644 index 0000000..b53ac65 --- /dev/null +++ b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/AbstractInstanceLockManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ode.bpel.iapi; + +import java.util.concurrent.TimeUnit; + +/** + * Abstract class to implement an instance lock manager + */ +public abstract class AbstractInstanceLockManager { + public abstract void unlock(Long iid); + + public abstract void lock(Long iid, int i, TimeUnit microseconds) throws InterruptedException, + TimeoutException; + + /** Exception class indicating a time-out occured while obtaining a lock. */ + public static final class TimeoutException extends Exception { + private static final long serialVersionUID = 7247629086692580285L; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index e278c7d..fe38cf0 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -21,12 +21,22 @@ package org.apache.ode.bpel.engine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.AbstractInstanceLockManager; import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; import org.apache.ode.bpel.evt.BpelEvent; -import org.apache.ode.bpel.iapi.*; +import org.apache.ode.bpel.iapi.AbstractInstanceLockManager; +import org.apache.ode.bpel.iapi.BpelEngine; +import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.MessageExchange; +import org.apache.ode.bpel.iapi.OdeGlobalConfig; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.apache.ode.bpel.iapi.ProcessState; +import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.MessageExchange.FailureType; import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; import org.apache.ode.bpel.iapi.MessageExchange.Status; @@ -50,7 +60,13 @@ import org.w3c.dom.Element; import javax.wsdl.Operation; import javax.wsdl.PortType; import javax.xml.namespace.QName; -import java.util.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; /** @@ -98,7 +114,7 @@ public class BpelEngineImpl implements BpelEngine { private SharedEndpoints _sharedEps; /** Manage instance-level locks. */ - private AbstractInstanceLockManager _instanceLockManager; + private final AbstractInstanceLockManager _instanceLockManager; final Contexts _contexts; http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java index dba127b..f712552 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java @@ -20,7 +20,7 @@ package org.apache.ode.bpel.engine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.AbstractInstanceLockManager; +import org.apache.ode.bpel.iapi.AbstractInstanceLockManager; import java.util.HashMap; import java.util.Map; http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java index 57984c0..8eac0b6 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java @@ -30,7 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.clapi.*; -import org.apache.ode.bpel.AbstractInstanceLockManager; +import org.apache.ode.bpel.iapi.AbstractInstanceLockManager; /** @@ -78,6 +78,7 @@ public class HazelcastClusterImpl implements ClusterManager { uuid = localMember.getUuid(); __log.info("Registering HZ localMember ID " + nodeID); markAsMaster(); + deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK); instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK); clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG); http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java index 44bbea8..2f5aa4d 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java @@ -64,6 +64,7 @@ public class HazelcastDeploymentLock implements ClusterLock{ } public boolean tryLockMap(String key,int time, TimeUnit tu) { + // Noting to do here. return true; } } http://git-wip-us.apache.org/repos/asf/ode/blob/cfa4a97b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java ---------------------------------------------------------------------- diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java index 40f1d66..6988746 100644 --- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java +++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java @@ -20,9 +20,10 @@ package org.apache.ode.clustering.hazelcast; import com.hazelcast.core.IMap; import org.apache.ode.bpel.clapi.ClusterLock; -import org.apache.ode.bpel.AbstractInstanceLockManager; +import org.apache.ode.bpel.iapi.AbstractInstanceLockManager; import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +44,7 @@ public class HazelcastInstanceLock extends AbstractInstanceLockManager implement public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, AbstractInstanceLockManager.TimeoutException { if (iid == null) { - if(__log.isDebugEnabled()) { + if (__log.isDebugEnabled()) { __log.debug(" Instance Id null at lock[]"); } return; @@ -51,15 +52,15 @@ public class HazelcastInstanceLock extends AbstractInstanceLockManager implement String thrd = Thread.currentThread().toString(); - if(__log.isDebugEnabled()) { + if (__log.isDebugEnabled()) { __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu + ")"); } - putIfAbsent(iid.toString(),iid.toString()); + putIfAbsent(iid.toString(), iid.toString()); - if (!tryLockMap(iid.toString(),time, tu)) { + if (!tryLockMap(iid.toString(), time, tu)) { - if(__log.isDebugEnabled()) { + if (__log.isDebugEnabled()) { __log.debug(thrd + ": lock(iid=" + iid + ", " + "time=" + time + tu + ")-->TIMEOUT"); } @@ -70,7 +71,7 @@ public class HazelcastInstanceLock extends AbstractInstanceLockManager implement public void unlock(Long iid) { if (iid == null) { - if(__log.isDebugEnabled()) { + if (__log.isDebugEnabled()) { __log.debug(" unlock, instance id is null"); } return; @@ -80,38 +81,38 @@ public class HazelcastInstanceLock extends AbstractInstanceLockManager implement unlockMap(iid.toString()); - if(__log.isDebugEnabled()) { + if (__log.isDebugEnabled()) { __log.debug(thrd + " unlock(iid=" + iid + ")"); } } public boolean lockMap(String key) { - _lock_map.lock(key); + // Noting to do here. return true; } public boolean unlockMap(String key) { if (_lock_map.get(key) == "true") { _lock_map.unlock(key); - _lock_map.replace(key,"false"); + _lock_map.replace(key, "false"); + return true; } - return true; + return false; } public boolean tryLockMap(String key) { - boolean state = _lock_map.tryLock(key); - return state; + // Noting to do here. + return true; } - public boolean tryLockMap(String key,int time, TimeUnit tu) { - boolean state = true; + public boolean tryLockMap(String key, int time, TimeUnit tu) { + boolean state = false; try { - state = _lock_map.tryLock(key,time,tu); + state = _lock_map.tryLock(key, time, tu); } catch (InterruptedException ex) { - __log.error(ex); + __log.error("Interruption occured" +ex); } - - _lock_map.replace(key,"" +state); + _lock_map.replace(key, "" + state); return state; } }
