Repository: stratos Updated Branches: refs/heads/master dee7291cf -> dce11f62f
Moving distributed object provider class to stratos common module Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/dce11f62 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/dce11f62 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/dce11f62 Branch: refs/heads/master Commit: dce11f62f2d1c12b7887447513053e1ec58131d9 Parents: dee7291 Author: Imesh Gunaratne <[email protected]> Authored: Mon Dec 1 15:13:05 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Mon Dec 1 15:13:20 2014 +0530 ---------------------------------------------------------------------- .../clustering/DistributedObjectHandler.java | 140 --------------- .../context/CloudControllerContext.java | 62 +++---- components/org.apache.stratos.common/pom.xml | 1 + .../clustering/DistributedObjectProvider.java | 173 +++++++++++++++++++ 4 files changed, 205 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java deleted file mode 100644 index 2b4437e..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cloud.controller.clustering; - -import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.IList; -import com.hazelcast.core.ILock; -import com.hazelcast.core.IMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * An object handler for managing objects in distributed and non-distributed environments. - */ -public class DistributedObjectHandler { - private static final Log log = LogFactory.getLog(DistributedObjectHandler.class); - - private final boolean clustered; - private final HazelcastInstance hazelcastInstance; - - public DistributedObjectHandler(boolean clustered, HazelcastInstance hazelcastInstance) { - this.clustered = clustered; - this.hazelcastInstance = hazelcastInstance; - } - - private com.hazelcast.core.ILock acquireDistributedLock(Object object) { - if (log.isDebugEnabled()) { - log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName())); - } - ILock lock = hazelcastInstance.getLock(object); - if (log.isDebugEnabled()) { - log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName())); - } - return lock; - } - - private void releaseDistributedLock(ILock lock) { - if (log.isDebugEnabled()) { - log.debug(String.format("Releasing distributed lock for %s...", lock.getKey())); - } - lock.forceUnlock(); - if (log.isDebugEnabled()) { - log.debug(String.format("Distributed lock released for %s", lock.getKey())); - } - } - - public Map getMap(String key) { - if(clustered) { - return hazelcastInstance.getMap(key); - } else { - return new ConcurrentHashMap<Object, Object>(); - } - } - - public List getList(String name) { - if(clustered) { - return hazelcastInstance.getList(name); - } else { - return new ArrayList(); - } - } - - public void putToMap(Map map, Object key, Object value) { - if(clustered) { - ILock lock = null; - try { - lock = acquireDistributedLock(map); - ((IMap)map).set(key, value); - } finally { - releaseDistributedLock(lock); - } - } else { - map.put(key, value); - } - } - - public void removeFromMap(Map map, Object key) { - if(clustered) { - ILock lock = null; - try { - lock = acquireDistributedLock(map); - ((IMap)map).delete(key); - } finally { - releaseDistributedLock(lock); - } - } else { - map.remove(key); - } - } - - public void addToList(List list, Object value) { - if(clustered) { - ILock lock = null; - try { - lock = acquireDistributedLock(list); - ((IList)list).add(value); - } finally { - releaseDistributedLock(lock); - } - } else { - list.add(value); - } - } - - public void removeFromList(List list, Object value) { - if(clustered) { - ILock lock = null; - try { - lock = acquireDistributedLock(list); - ((IList)list).remove(value); - } finally { - releaseDistributedLock(lock); - } - } else { - list.remove(value); - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java index b589ecc..969c3c0 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java @@ -22,7 +22,7 @@ import org.apache.axis2.clustering.ClusteringAgent; import org.apache.axis2.engine.AxisConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.clustering.DistributedObjectHandler; +import org.apache.stratos.common.clustering.DistributedObjectProvider; import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.internal.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.registry.Deserializer; @@ -60,7 +60,7 @@ public class CloudControllerContext implements Serializable { private static volatile CloudControllerContext instance; - private final DistributedObjectHandler distributedObjectHandler; + private final DistributedObjectProvider distributedObjectProvider; /* We keep following maps in order to make the look up time, small. */ @@ -131,18 +131,18 @@ public class CloudControllerContext implements Serializable { } // Initialize distributed object handler - distributedObjectHandler = new DistributedObjectHandler(isClustered(), + distributedObjectProvider = new DistributedObjectProvider(isClustered(), ServiceReferenceHolder.getInstance().getHazelcastInstance()); // Initialize objects - clusterIdToMemberContextListMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX); - memberIdToMemberContextMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_MEMBER_CTX); - memberIdToScheduledTaskMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_SCH_TASK); - kubClusterIdToKubClusterContextMap = distributedObjectHandler.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX); - clusterIdToContextMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX); - cartridgeTypeToPartitionIdsMap = distributedObjectHandler.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS); - cartridges = distributedObjectHandler.getList(CC_CARTRIDGES); - serviceGroups = distributedObjectHandler.getList(CC_SERVICE_GROUPS); + clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX); + memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX); + memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK); + kubClusterIdToKubClusterContextMap = distributedObjectProvider.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX); + clusterIdToContextMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX); + cartridgeTypeToPartitionIdsMap = distributedObjectProvider.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS); + cartridges = distributedObjectProvider.getList(CC_CARTRIDGES); + serviceGroups = distributedObjectProvider.getList(CC_SERVICE_GROUPS); // Update context from the registry updateContextFromRegistry(); @@ -185,7 +185,7 @@ public class CloudControllerContext implements Serializable { } public void addCartridge(Cartridge newCartridges) { - distributedObjectHandler.addToList(cartridges, newCartridges); + distributedObjectProvider.addToList(cartridges, newCartridges); } public ServiceGroup getServiceGroup(String name) { @@ -198,7 +198,7 @@ public class CloudControllerContext implements Serializable { } public void addServiceGroup(ServiceGroup newServiceGroup) { - distributedObjectHandler.addToList(serviceGroups, newServiceGroup); + distributedObjectProvider.addToList(serviceGroups, newServiceGroup); } public void removeServiceGroup(List<ServiceGroup> serviceGroup) { @@ -240,17 +240,17 @@ public class CloudControllerContext implements Serializable { } public void addMemberContext(MemberContext memberContext) { - distributedObjectHandler.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext); + distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext); List<MemberContext> memberContextList; if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) { memberContextList = new ArrayList<MemberContext>(); } if (memberContextList.contains(memberContext)) { - distributedObjectHandler.removeFromList(memberContextList,memberContext); + distributedObjectProvider.removeFromList(memberContextList,memberContext); } - distributedObjectHandler.addToList(memberContextList, memberContext); - distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(), + distributedObjectProvider.addToList(memberContextList, memberContext); + distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(), memberContextList); if (log.isDebugEnabled()) { log.debug("Added member context to the cloud controller context: " + memberContext); @@ -258,20 +258,20 @@ public class CloudControllerContext implements Serializable { } public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) { - distributedObjectHandler.putToMap(memberIdToScheduledTaskMap, memberId, job); + distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job); } public List<MemberContext> removeMemberContextsOfCluster(String clusterId) { List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId); - distributedObjectHandler.removeFromMap(clusterIdToMemberContextListMap, clusterId); + distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId); if (memberContextList == null) { return new ArrayList<MemberContext>(); } for (MemberContext memberContext : memberContextList) { String memberId = memberContext.getMemberId(); - distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId); + distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId); ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId); - distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId); + distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId); stopTask(task); if (log.isDebugEnabled()) { @@ -284,7 +284,7 @@ public class CloudControllerContext implements Serializable { public MemberContext removeMemberContext(String memberId, String clusterId) { MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId); - distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId); + distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId); List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId); if (memberContextList != null) { @@ -298,10 +298,10 @@ public class CloudControllerContext implements Serializable { iterator.remove(); } } - distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts); + distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts); } ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId); - distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId); + distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId); stopTask(task); return removedMemberContext; } @@ -322,7 +322,7 @@ public class CloudControllerContext implements Serializable { } public void addClusterContext(ClusterContext ctxt) { - distributedObjectHandler.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt); + distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt); } public ClusterContext getClusterContext(String clusterId) { @@ -331,7 +331,7 @@ public class CloudControllerContext implements Serializable { public ClusterContext removeClusterContext(String clusterId) { ClusterContext removed = clusterIdToContextMap.get(clusterId); - distributedObjectHandler.removeFromMap(clusterIdToContextMap, clusterId); + distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId); return removed; } @@ -349,11 +349,11 @@ public class CloudControllerContext implements Serializable { list = new ArrayList<String>(); } list.add(partitionId); - distributedObjectHandler.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list); + distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list); } public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) { - distributedObjectHandler.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType); + distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType); } public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) { @@ -361,7 +361,7 @@ public class CloudControllerContext implements Serializable { } public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) { - distributedObjectHandler.putToMap(kubClusterIdToKubClusterContextMap, + distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap, kubernetesClusterContext.getKubernetesClusterId(), kubernetesClusterContext); } @@ -420,13 +420,13 @@ public class CloudControllerContext implements Serializable { private void copyMap(Map sourceMap, Map destinationMap) { for(Object key : sourceMap.keySet()) { - distributedObjectHandler.putToMap(destinationMap, key, sourceMap.get(key)); + distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key)); } } private void copyList(List sourceList, List destinationList) { for(Object item : sourceList) { - distributedObjectHandler.addToList(destinationList, item); + distributedObjectProvider.addToList(destinationList, item); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/pom.xml ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml index 618eb5f..6c33f0d 100644 --- a/components/org.apache.stratos.common/pom.xml +++ b/components/org.apache.stratos.common/pom.xml @@ -46,6 +46,7 @@ <Bundle-Name>${project.artifactId}</Bundle-Name> <Export-Package> org.apache.stratos.common.*, + org.apache.stratos.common.clustering.* org.apache.stratos.common.statistics.publisher.*, </Export-Package> <Import-Package> http://git-wip-us.apache.org/repos/asf/stratos/blob/dce11f62/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java new file mode 100644 index 0000000..fe47ca4 --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.clustering; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IList; +import com.hazelcast.core.ILock; +import com.hazelcast.core.IMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Provides objects to be managed in distributed and non-distributed environments. + */ +public class DistributedObjectProvider { + private static final Log log = LogFactory.getLog(DistributedObjectProvider.class); + + private final boolean clustered; + private final HazelcastInstance hazelcastInstance; + + public DistributedObjectProvider(boolean clustered, HazelcastInstance hazelcastInstance) { + this.clustered = clustered; + this.hazelcastInstance = hazelcastInstance; + } + + private com.hazelcast.core.ILock acquireDistributedLock(Object object) { + if (log.isDebugEnabled()) { + log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName())); + } + ILock lock = hazelcastInstance.getLock(object); + if (log.isDebugEnabled()) { + log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName())); + } + return lock; + } + + private void releaseDistributedLock(ILock lock) { + if (log.isDebugEnabled()) { + log.debug(String.format("Releasing distributed lock for %s...", lock.getKey())); + } + lock.forceUnlock(); + if (log.isDebugEnabled()) { + log.debug(String.format("Distributed lock released for %s", lock.getKey())); + } + } + + /** + * If clustering is enabled returns a distributed map object, otherwise returns a + * concurrent local map object. + * @param key + * @return + */ + public Map getMap(String key) { + if(clustered) { + return hazelcastInstance.getMap(key); + } else { + return new ConcurrentHashMap<Object, Object>(); + } + } + + /** + * If clustering is enabled returns a distributed list, otherwise returns + * a local array list. + * @param name + * @return + */ + public List getList(String name) { + if(clustered) { + return hazelcastInstance.getList(name); + } else { + return new ArrayList(); + } + } + + /** + * Put a key value pair to a map, if clustered use a distributed lock. + * @param map + * @param key + * @param value + */ + public void putToMap(Map map, Object key, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(map); + ((IMap)map).set(key, value); + } finally { + releaseDistributedLock(lock); + } + } else { + map.put(key, value); + } + } + + /** + * Remove an object from a map, if clustered use a distributed lock. + * @param map + * @param key + */ + public void removeFromMap(Map map, Object key) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(map); + ((IMap)map).delete(key); + } finally { + releaseDistributedLock(lock); + } + } else { + map.remove(key); + } + } + + /** + * Add an object to a list, if clustered use a distributed lock. + * @param list + * @param value + */ + public void addToList(List list, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(list); + ((IList)list).add(value); + } finally { + releaseDistributedLock(lock); + } + } else { + list.add(value); + } + } + + /** + * Remove an object from a list, if clustered use a distributed lock. + * @param list + * @param value + */ + public void removeFromList(List list, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(list); + ((IList)list).remove(value); + } finally { + releaseDistributedLock(lock); + } + } else { + list.remove(value); + } + } +}
