AMBARI-18475 - Remove Global Cluster Lock Shared Between Business Objects (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/561c6f2f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/561c6f2f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/561c6f2f Branch: refs/heads/branch-feature-AMBARI-18456 Commit: 561c6f2f38f9b262dda4acd7ff0526b7caf55bce Parents: 8192601 Author: Jonathan Hurley <[email protected]> Authored: Tue Sep 27 11:44:12 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Tue Sep 27 15:34:40 2016 -0400 ---------------------------------------------------------------------- .../ambari/annotations/ExperimentalFeature.java | 8 +- .../AmbariManagementControllerImpl.java | 30 +- .../alerts/AlertServiceStateListener.java | 122 ++++--- .../org/apache/ambari/server/state/Cluster.java | 7 - .../apache/ambari/server/state/ConfigImpl.java | 98 +++-- .../org/apache/ambari/server/state/Service.java | 7 - .../ambari/server/state/ServiceComponent.java | 7 - .../server/state/ServiceComponentImpl.java | 364 +++++++------------ .../apache/ambari/server/state/ServiceImpl.java | 306 ++++++---------- .../server/state/cluster/ClusterImpl.java | 6 - .../state/configgroup/ConfigGroupImpl.java | 92 ++--- .../svccomphost/ServiceComponentHostImpl.java | 227 +++++------- .../server/update/HostUpdateHelperTest.java | 40 +- 13 files changed, 522 insertions(+), 792 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java index 1d5ba0e..7798f26 100644 --- a/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java +++ b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java @@ -18,6 +18,7 @@ package org.apache.ambari.annotations; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; /** * The {@link ExperimentalFeature} enumeration is meant to be used with the @@ -40,5 +41,10 @@ public enum ExperimentalFeature { /** * Used for code that is targeted for patch upgrades */ - PATCH_UPGRADES + PATCH_UPGRADES, + + /** + * The removal of the cluster global {@link Lock} + */ + CLUSTER_GLOBAL_LOCK_REMOVAL } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 1fc9dbf..ac680a5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -63,7 +63,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; import javax.persistence.RollbackException; @@ -202,6 +201,7 @@ import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -209,7 +209,6 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; -import com.google.common.collect.ListMultimap; @Singleton public class AmbariManagementControllerImpl implements AmbariManagementController { @@ -3111,13 +3110,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle changedHosts, requestParameters, requestProperties, runSmokeTest, reconfigureClients); - Lock clusterWriteLock = cluster.getClusterGlobalLock().writeLock(); - clusterWriteLock.lock(); - try { - updateServiceStates(cluster, changedServices, changedComponents, changedHosts, ignoredHosts); - } finally { - clusterWriteLock.unlock(); - } + updateServiceStates(cluster, changedServices, changedComponents, changedHosts, ignoredHosts); + return requestStages; } @@ -5166,13 +5160,15 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle StackInfo stackInfo = ambariMetaInfo.getStack(linkEntity.getStack().getStackName(), linkEntity.getStack().getStackVersion()); - if (stackInfo == null) + if (stackInfo == null) { throw new StackAccessException("stackName=" + linkEntity.getStack().getStackName() + ", stackVersion=" + linkEntity.getStack().getStackVersion()); + } ExtensionInfo extensionInfo = ambariMetaInfo.getExtension(linkEntity.getExtension().getExtensionName(), linkEntity.getExtension().getExtensionVersion()); - if (extensionInfo == null) + if (extensionInfo == null) { throw new StackAccessException("extensionName=" + linkEntity.getExtension().getExtensionName() + ", extensionVersion=" + linkEntity.getExtension().getExtensionVersion()); + } ExtensionHelper.validateDeleteLink(getClusters(), stackInfo, extensionInfo); ambariMetaInfo.getStackManager().unlinkStackAndExtension(stackInfo, extensionInfo); @@ -5202,13 +5198,15 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle StackInfo stackInfo = ambariMetaInfo.getStack(request.getStackName(), request.getStackVersion()); - if (stackInfo == null) + if (stackInfo == null) { throw new StackAccessException("stackName=" + request.getStackName() + ", stackVersion=" + request.getStackVersion()); + } ExtensionInfo extensionInfo = ambariMetaInfo.getExtension(request.getExtensionName(), request.getExtensionVersion()); - if (extensionInfo == null) + if (extensionInfo == null) { throw new StackAccessException("extensionName=" + request.getExtensionName() + ", extensionVersion=" + request.getExtensionVersion()); + } ExtensionHelper.validateCreateLink(stackInfo, extensionInfo); ExtensionLinkEntity linkEntity = createExtensionLinkEntity(request); @@ -5265,13 +5263,15 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle public void updateExtensionLink(ExtensionLinkEntity linkEntity) throws AmbariException { StackInfo stackInfo = ambariMetaInfo.getStack(linkEntity.getStack().getStackName(), linkEntity.getStack().getStackVersion()); - if (stackInfo == null) + if (stackInfo == null) { throw new StackAccessException("stackName=" + linkEntity.getStack().getStackName() + ", stackVersion=" + linkEntity.getStack().getStackVersion()); + } ExtensionInfo extensionInfo = ambariMetaInfo.getExtension(linkEntity.getExtension().getExtensionName(), linkEntity.getExtension().getExtensionVersion()); - if (extensionInfo == null) + if (extensionInfo == null) { throw new StackAccessException("extensionName=" + linkEntity.getExtension().getExtensionName() + ", extensionVersion=" + linkEntity.getExtension().getExtensionVersion()); + } ambariMetaInfo.getStackManager().linkStackToExtension(stackInfo, extensionInfo); } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java index da4cbf5..6f6cea8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.events.listeners.alerts; import java.text.MessageFormat; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Lock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; @@ -34,7 +35,6 @@ import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.AlertDispatchDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertGroupEntity; -import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.alert.AlertDefinition; import org.apache.ambari.server.state.alert.AlertDefinitionFactory; @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.Striped; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -95,7 +96,13 @@ public class AlertServiceStateListener { * Used to retrieve a cluster using clusterId from event. */ @Inject - private Provider<Clusters> clusters; + private Provider<Clusters> m_clusters; + + /** + * Used for ensuring that the concurrent nature of the event handler methods + * don't collide when attempting to perform operations on the same service. + */ + private Striped<Lock> m_locksByService = Striped.lazyWeakLock(20); /** * Constructor. @@ -125,38 +132,46 @@ public class AlertServiceStateListener { String stackVersion = event.getStackVersion(); String serviceName = event.getServiceName(); - // create the default alert group for the new service if absent; this MUST - // be done before adding definitions so that they are properly added to the - // default group - if (null == m_alertDispatchDao.findDefaultServiceGroup(clusterId, serviceName)) { - try { - m_alertDispatchDao.createDefaultGroup(clusterId, serviceName); - } catch (AmbariException ambariException) { - LOG.error("Unable to create a default alert group for {}", - event.getServiceName(), ambariException); - } - } + Lock lock = m_locksByService.get(serviceName); + lock.lock(); - // populate alert definitions for the new service from the database, but - // don't worry about sending down commands to the agents; the host - // components are not yet bound to the hosts so we'd have no way of knowing - // which hosts are invalidated; do that in another impl try { - Set<AlertDefinition> alertDefinitions = m_metaInfoProvider.get().getAlertDefinitions( - stackName, stackVersion, serviceName); + // create the default alert group for the new service if absent; this MUST + // be done before adding definitions so that they are properly added to the + // default group + if (null == m_alertDispatchDao.findDefaultServiceGroup(clusterId, serviceName)) { + try { + m_alertDispatchDao.createDefaultGroup(clusterId, serviceName); + } catch (AmbariException ambariException) { + LOG.error("Unable to create a default alert group for {}", + event.getServiceName(), ambariException); + } + } - for (AlertDefinition definition : alertDefinitions) { - AlertDefinitionEntity entity = m_alertDefinitionFactory.coerce( - clusterId, - definition); + // populate alert definitions for the new service from the database, but + // don't worry about sending down commands to the agents; the host + // components are not yet bound to the hosts so we'd have no way of knowing + // which hosts are invalidated; do that in another impl + try { + Set<AlertDefinition> alertDefinitions = m_metaInfoProvider.get().getAlertDefinitions( + stackName, stackVersion, serviceName); - m_definitionDao.create(entity); + for (AlertDefinition definition : alertDefinitions) { + AlertDefinitionEntity entity = m_alertDefinitionFactory.coerce( + clusterId, + definition); + + m_definitionDao.create(entity); + } + } catch (AmbariException ae) { + String message = MessageFormat.format( + "Unable to populate alert definitions from the database during installation of {0}", + serviceName); + LOG.error(message, ae); } - } catch (AmbariException ae) { - String message = MessageFormat.format( - "Unable to populate alert definitions from the database during installation of {0}", - serviceName); - LOG.error(message, ae); + } + finally { + lock.unlock(); } } @@ -170,43 +185,44 @@ public class AlertServiceStateListener { @AllowConcurrentEvents public void onAmbariEvent(ServiceRemovedEvent event) { LOG.debug("Received event {}", event); - Cluster cluster = null; try { - cluster = clusters.get().getClusterById(event.getClusterId()); + m_clusters.get().getClusterById(event.getClusterId()); } catch (AmbariException e) { - LOG.warn("Unable to retrieve cluster info for id: " + event.getClusterId()); + LOG.warn("Unable to retrieve cluster with id {}", event.getClusterId()); + return; } - if (cluster != null) { - // TODO: Explicit locking used to prevent deadlock situation caused during cluster delete - cluster.getClusterGlobalLock().writeLock().lock(); - try { - List<AlertDefinitionEntity> definitions = m_definitionDao.findByService(event.getClusterId(), + String serviceName = event.getServiceName(); + Lock lock = m_locksByService.get(serviceName); + lock.lock(); + + try { + List<AlertDefinitionEntity> definitions = m_definitionDao.findByService(event.getClusterId(), event.getServiceName()); - for (AlertDefinitionEntity definition : definitions) { - try { - m_definitionDao.remove(definition); - } catch (Exception exception) { - LOG.error("Unable to remove alert definition {}", definition.getDefinitionName(), exception); - } + for (AlertDefinitionEntity definition : definitions) { + try { + m_definitionDao.remove(definition); + } catch (Exception exception) { + LOG.error("Unable to remove alert definition {}", definition.getDefinitionName(), + exception); } + } - // remove the default group for the service - AlertGroupEntity group = m_alertDispatchDao.findGroupByName(event.getClusterId(), + // remove the default group for the service + AlertGroupEntity group = m_alertDispatchDao.findGroupByName(event.getClusterId(), event.getServiceName()); - if (null != group && group.isDefault()) { - try { - m_alertDispatchDao.remove(group); - } catch (Exception exception) { - LOG.error("Unable to remove default alert group {}", group.getGroupName(), exception); - } + if (null != group && group.isDefault()) { + try { + m_alertDispatchDao.remove(group); + } catch (Exception exception) { + LOG.error("Unable to remove default alert group {}", group.getGroupName(), exception); } - } finally { - cluster.getClusterGlobalLock().writeLock().unlock(); } + } finally { + lock.unlock(); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index 2452df6..d141df8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReadWriteLock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ClusterResponse; @@ -522,12 +521,6 @@ public interface Cluster { Service addService(String serviceName) throws AmbariException; /** - * Get lock to control access to cluster structure - * @return cluster-global lock - */ - ReadWriteLock getClusterGlobalLock(); - - /** * Fetch desired configs for list of hosts in cluster * @param hostIds * @return http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java index 7b7a60b..28bcd5f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java @@ -27,9 +27,6 @@ import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ambari.annotations.TransactionalLock; -import org.apache.ambari.annotations.TransactionalLock.LockArea; -import org.apache.ambari.annotations.TransactionalLock.LockType; import org.apache.ambari.server.events.ClusterConfigChangedEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.ClusterDAO; @@ -365,69 +362,64 @@ public class ConfigImpl implements Config { @Override @Transactional public void persist(boolean newConfig) { - cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not expected, NPE anyway later in code + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId()); - - if (newConfig) { - ClusterConfigEntity entity = new ClusterConfigEntity(); - entity.setClusterEntity(clusterEntity); - entity.setClusterId(cluster.getClusterId()); - entity.setType(getType()); - entity.setVersion(getVersion()); - entity.setTag(getTag()); - entity.setTimestamp(new Date().getTime()); - entity.setStack(clusterEntity.getDesiredStack()); - entity.setData(gson.toJson(getProperties())); + ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId()); + + if (newConfig) { + ClusterConfigEntity entity = new ClusterConfigEntity(); + entity.setClusterEntity(clusterEntity); + entity.setClusterId(cluster.getClusterId()); + entity.setType(getType()); + entity.setVersion(getVersion()); + entity.setTag(getTag()); + entity.setTimestamp(new Date().getTime()); + entity.setStack(clusterEntity.getDesiredStack()); + entity.setData(gson.toJson(getProperties())); + + if (null != getPropertiesAttributes()) { + entity.setAttributes(gson.toJson(getPropertiesAttributes())); + } - if (null != getPropertiesAttributes()) { - entity.setAttributes(gson.toJson(getPropertiesAttributes())); + clusterDAO.createConfig(entity); + clusterEntity.getClusterConfigEntities().add(entity); + + // save the entity, forcing a flush to ensure the refresh picks up the + // newest data + clusterDAO.merge(clusterEntity, true); + } else { + // only supporting changes to the properties + ClusterConfigEntity entity = null; + + // find the existing configuration to update + for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) { + if (getTag().equals(cfe.getTag()) && getType().equals(cfe.getType()) + && getVersion().equals(cfe.getVersion())) { + entity = cfe; + break; } + } + + // if the configuration was found, then update it + if (null != entity) { + LOG.debug( + "Updating {} version {} with new configurations; a new version will not be created", + getType(), getVersion()); - clusterDAO.createConfig(entity); - clusterEntity.getClusterConfigEntities().add(entity); + entity.setData(gson.toJson(getProperties())); // save the entity, forcing a flush to ensure the refresh picks up the // newest data clusterDAO.merge(clusterEntity, true); - cluster.refresh(); - } else { - // only supporting changes to the properties - ClusterConfigEntity entity = null; - - // find the existing configuration to update - for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) { - if (getTag().equals(cfe.getTag()) && - getType().equals(cfe.getType()) && - getVersion().equals(cfe.getVersion())) { - entity = cfe; - break; - } - } - - // if the configuration was found, then update it - if (null != entity) { - LOG.debug( - "Updating {} version {} with new configurations; a new version will not be created", - getType(), getVersion()); - - entity.setData(gson.toJson(getProperties())); - - // save the entity, forcing a flush to ensure the refresh picks up the - // newest data - clusterDAO.merge(clusterEntity, true); - cluster.refresh(); - } } - } finally { - readWriteLock.writeLock().unlock(); } } finally { - cluster.getClusterGlobalLock().writeLock().unlock(); + readWriteLock.writeLock().unlock(); } + // re-load the entity associations for the cluster + cluster.refresh(); + // broadcast the change event for the configuration ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(cluster.getClusterName(), getType(), getTag(), getVersion()); http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java index 7000574..48ab252 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java @@ -19,7 +19,6 @@ package org.apache.ambari.server.state; import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ServiceResponse; @@ -99,12 +98,6 @@ public interface Service { void delete() throws AmbariException; /** - * Get lock to control access to cluster structure - * @return cluster-global lock - */ - ReadWriteLock getClusterGlobalLock(); - - /** * Sets the maintenance state for the service * @param state the state */ http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java index 983cbdf..8387ab8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java @@ -19,7 +19,6 @@ package org.apache.ambari.server.state; import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ServiceComponentResponse; @@ -98,10 +97,4 @@ public interface ServiceComponent { String hostName) throws AmbariException; void delete() throws AmbariException; - - /** - * Get lock to control access to cluster structure - * @return cluster-global lock - */ - ReadWriteLock getClusterGlobalLock(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java index 3e805a0..282396d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java @@ -18,12 +18,16 @@ package org.apache.ambari.server.state; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.ProvisionException; -import com.google.inject.assistedinject.Assisted; -import com.google.inject.assistedinject.AssistedInject; -import com.google.inject.persist.Transactional; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ObjectNotFoundException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; @@ -46,18 +50,18 @@ import org.apache.ambari.server.state.cluster.ClusterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.ProvisionException; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import com.google.inject.persist.Transactional; public class ServiceComponentImpl implements ServiceComponent { private final static Logger LOG = LoggerFactory.getLogger(ServiceComponentImpl.class); private final Service service; - private final ReadWriteLock clusterGlobalLock; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final String componentName; private final String displayName; @@ -79,7 +83,7 @@ public class ServiceComponentImpl implements ServiceComponent { private AmbariEventPublisher eventPublisher; ServiceComponentDesiredStateEntity desiredStateEntity; - private Map<String, ServiceComponentHost> hostComponents; + private ConcurrentMap<String, ServiceComponentHost> hostComponents; /** * Data access object used for lookup up stacks. @@ -91,7 +95,6 @@ public class ServiceComponentImpl implements ServiceComponent { public ServiceComponentImpl(@Assisted Service service, @Assisted String componentName, Injector injector) throws AmbariException { injector.injectMembers(this); - clusterGlobalLock = service.getClusterGlobalLock(); this.service = service; desiredStateEntity = new ServiceComponentDesiredStateEntity(); @@ -103,7 +106,7 @@ public class ServiceComponentImpl implements ServiceComponent { desiredStateEntity.setRecoveryEnabled(false); setDesiredStackVersion(service.getDesiredStackVersion()); - hostComponents = new HashMap<String, ServiceComponentHost>(); + hostComponents = new ConcurrentHashMap<String, ServiceComponentHost>(); StackId stackId = service.getDesiredStackVersion(); try { @@ -129,7 +132,6 @@ public class ServiceComponentImpl implements ServiceComponent { @Assisted ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity, Injector injector) throws AmbariException { injector.injectMembers(this); - clusterGlobalLock = service.getClusterGlobalLock(); this.service = service; desiredStateEntity = serviceComponentDesiredStateEntity; @@ -153,7 +155,7 @@ public class ServiceComponentImpl implements ServiceComponent { + ", stackInfo=" + stackId.getStackId()); } - hostComponents = new HashMap<String, ServiceComponentHost>(); + hostComponents = new ConcurrentHashMap<String, ServiceComponentHost>(); for (HostComponentStateEntity hostComponentStateEntity : desiredStateEntity.getHostComponentStateEntities()) { HostComponentDesiredStateEntityPK pk = new HostComponentDesiredStateEntityPK(); pk.setClusterId(hostComponentStateEntity.getClusterId()); @@ -179,11 +181,6 @@ public class ServiceComponentImpl implements ServiceComponent { } @Override - public ReadWriteLock getClusterGlobalLock() { - return clusterGlobalLock; - } - - @Override public String getName() { return componentName; } @@ -254,145 +251,84 @@ public class ServiceComponentImpl implements ServiceComponent { } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public Map<String, ServiceComponentHost> getServiceComponentHosts() { - clusterGlobalLock.readLock().lock(); - try { - readWriteLock.readLock().lock(); - try { - return new HashMap<String, ServiceComponentHost>(hostComponents); - } finally { - readWriteLock.readLock().unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } + return new HashMap<String, ServiceComponentHost>(hostComponents); } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void addServiceComponentHosts( Map<String, ServiceComponentHost> hostComponents) throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - // TODO validation - for (Entry<String, ServiceComponentHost> entry : - hostComponents.entrySet()) { - if (!entry.getKey().equals(entry.getValue().getHostName())) { - throw new AmbariException("Invalid arguments in map" - + ", hostname does not match the key in map"); - } + // TODO validation + for (Entry<String, ServiceComponentHost> entry : + hostComponents.entrySet()) { + if (!entry.getKey().equals(entry.getValue().getHostName())) { + throw new AmbariException("Invalid arguments in map" + + ", hostname does not match the key in map"); } - for (ServiceComponentHost sch : hostComponents.values()) { - addServiceComponentHost(sch); - } - } finally { - readWriteLock.writeLock().unlock(); + } + for (ServiceComponentHost sch : hostComponents.values()) { + addServiceComponentHost(sch); } } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void addServiceComponentHost( ServiceComponentHost hostComponent) throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - // TODO validation - // TODO ensure host belongs to cluster - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a ServiceComponentHost to ServiceComponent" - + ", clusterName=" + service.getCluster().getClusterName() - + ", clusterId=" + service.getCluster().getClusterId() - + ", serviceName=" + service.getName() - + ", serviceComponentName=" + getName() - + ", hostname=" + hostComponent.getHostName() - + ", recoveryEnabled=" + isRecoveryEnabled()); - } - if (hostComponents.containsKey(hostComponent.getHostName())) { - throw new AmbariException("Cannot add duplicate ServiceComponentHost" - + ", clusterName=" + service.getCluster().getClusterName() - + ", clusterId=" + service.getCluster().getClusterId() - + ", serviceName=" + service.getName() - + ", serviceComponentName=" + getName() - + ", hostname=" + hostComponent.getHostName() - + ", recoveryEnabled=" + isRecoveryEnabled()); - } - // FIXME need a better approach of caching components by host - ClusterImpl clusterImpl = (ClusterImpl) service.getCluster(); - clusterImpl.addServiceComponentHost(hostComponent); - hostComponents.put(hostComponent.getHostName(), hostComponent); - } finally { - readWriteLock.writeLock().unlock(); + // TODO validation + // TODO ensure host belongs to cluster + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a ServiceComponentHost to ServiceComponent" + ", clusterName=" + + service.getCluster().getClusterName() + ", clusterId=" + + service.getCluster().getClusterId() + ", serviceName=" + service.getName() + + ", serviceComponentName=" + getName() + ", hostname=" + hostComponent.getHostName() + + ", recoveryEnabled=" + isRecoveryEnabled()); + } + + if (hostComponents.containsKey(hostComponent.getHostName())) { + throw new AmbariException("Cannot add duplicate ServiceComponentHost" + ", clusterName=" + + service.getCluster().getClusterName() + ", clusterId=" + + service.getCluster().getClusterId() + ", serviceName=" + service.getName() + + ", serviceComponentName=" + getName() + ", hostname=" + hostComponent.getHostName() + + ", recoveryEnabled=" + isRecoveryEnabled()); } + // FIXME need a better approach of caching components by host + ClusterImpl clusterImpl = (ClusterImpl) service.getCluster(); + clusterImpl.addServiceComponentHost(hostComponent); + hostComponents.put(hostComponent.getHostName(), hostComponent); } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public ServiceComponentHost addServiceComponentHost(String hostName) throws AmbariException { - clusterGlobalLock.writeLock().lock(); - try { - readWriteLock.writeLock().lock(); - try { - // TODO validation - // TODO ensure host belongs to cluster - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a ServiceComponentHost to ServiceComponent" - + ", clusterName=" + service.getCluster().getClusterName() - + ", clusterId=" + service.getCluster().getClusterId() - + ", serviceName=" + service.getName() - + ", serviceComponentName=" + getName() - + ", recoveryEnabled=" + isRecoveryEnabled() - + ", hostname=" + hostName); - } - if (hostComponents.containsKey(hostName)) { - throw new AmbariException("Cannot add duplicate ServiceComponentHost" - + ", clusterName=" + service.getCluster().getClusterName() - + ", clusterId=" + service.getCluster().getClusterId() - + ", serviceName=" + service.getName() - + ", serviceComponentName=" + getName() - + ", recoveryEnabled=" + isRecoveryEnabled() - + ", hostname=" + hostName); - } - ServiceComponentHost hostComponent = serviceComponentHostFactory.createNew(this, hostName); - // FIXME need a better approach of caching components by host - ClusterImpl clusterImpl = (ClusterImpl) service.getCluster(); - clusterImpl.addServiceComponentHost(hostComponent); - - hostComponents.put(hostComponent.getHostName(), hostComponent); - - return hostComponent; - } finally { - readWriteLock.writeLock().unlock(); - } - } finally { - clusterGlobalLock.writeLock().unlock(); - } + ServiceComponentHost hostComponent = serviceComponentHostFactory.createNew(this, hostName); + addServiceComponentHost(hostComponent); + return hostComponent; } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public ServiceComponentHost getServiceComponentHost(String hostname) throws AmbariException { - clusterGlobalLock.readLock().lock(); - try { - readWriteLock.readLock().lock(); - try { - if (!hostComponents.containsKey(hostname)) { - throw new ServiceComponentHostNotFoundException(getClusterName(), - getServiceName(), getName(), hostname); - } - return hostComponents.get(hostname); - } finally { - readWriteLock.readLock().unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); + + if (!hostComponents.containsKey(hostname)) { + throw new ServiceComponentHostNotFoundException(getClusterName(), + getServiceName(), getName(), hostname); } + + return hostComponents.get(hostname); } @Override @@ -580,38 +516,20 @@ public class ServiceComponentImpl implements ServiceComponent { * transaction is not necessary before this calling this method. */ @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void persist() { - boolean clusterWriteLockAcquired = false; - if (!persisted) { - clusterGlobalLock.writeLock().lock(); - clusterWriteLockAcquired = true; - } - + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - if (!persisted) { - // persist the new cluster topology and then release the cluster lock - // as it has no more bearing on the rest of this persist() method - persistEntities(); - clusterGlobalLock.writeLock().unlock(); - clusterWriteLockAcquired = false; - - refresh(); - // There refresh calls are no longer needed with cached references - // not used on getters/setters - // service.refresh(); - persisted = true; - } else { - saveIfPersisted(desiredStateEntity); - } - } finally { - readWriteLock.writeLock().unlock(); + if (!persisted) { + // persist the new cluster topology + persistEntities(); + refresh(); + persisted = true; + } else { + saveIfPersisted(desiredStateEntity); } } finally { - if (clusterWriteLockAcquired) { - clusterGlobalLock.writeLock().unlock(); - } + readWriteLock.writeLock().unlock(); } } @@ -671,123 +589,95 @@ public class ServiceComponentImpl implements ServiceComponent { } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public boolean canBeRemoved() { - clusterGlobalLock.readLock().lock(); + readWriteLock.readLock().lock(); try { - readWriteLock.readLock().lock(); - try { - // A component can be deleted if all it's host components - // can be removed, irrespective of the state of - // the component itself - for (ServiceComponentHost sch : hostComponents.values()) { - if (!sch.canBeRemoved()) { - LOG.warn("Found non removable hostcomponent when trying to" - + " delete service component" - + ", clusterName=" + getClusterName() - + ", serviceName=" + getServiceName() - + ", componentName=" + getName() - + ", state=" + sch.getState() - + ", hostname=" + sch.getHostName()); - return false; - } + // A component can be deleted if all it's host components + // can be removed, irrespective of the state of + // the component itself + for (ServiceComponentHost sch : hostComponents.values()) { + if (!sch.canBeRemoved()) { + LOG.warn("Found non removable hostcomponent when trying to" + " delete service component" + + ", clusterName=" + getClusterName() + ", serviceName=" + getServiceName() + + ", componentName=" + getName() + ", state=" + sch.getState() + ", hostname=" + + sch.getHostName()); + return false; } - return true; - } finally { - readWriteLock.readLock().unlock(); } + return true; } finally { - clusterGlobalLock.readLock().unlock(); + readWriteLock.readLock().unlock(); } } @Override @Transactional + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void deleteAllServiceComponentHosts() throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - LOG.info("Deleting all servicecomponenthosts for component" - + ", clusterName=" + getClusterName() - + ", serviceName=" + getServiceName() - + ", componentName=" + getName() - + ", recoveryEnabled=" + isRecoveryEnabled()); - for (ServiceComponentHost sch : hostComponents.values()) { - if (!sch.canBeRemoved()) { - throw new AmbariException("Found non removable hostcomponent " - + " when trying to delete" - + " all hostcomponents from servicecomponent" - + ", clusterName=" + getClusterName() - + ", serviceName=" + getServiceName() - + ", componentName=" + getName() - + ", recoveryEnabled=" + isRecoveryEnabled() - + ", hostname=" + sch.getHostName()); - } - } - - for (ServiceComponentHost serviceComponentHost : hostComponents.values()) { - serviceComponentHost.delete(); + LOG.info("Deleting all servicecomponenthosts for component" + ", clusterName=" + + getClusterName() + ", serviceName=" + getServiceName() + ", componentName=" + getName() + + ", recoveryEnabled=" + isRecoveryEnabled()); + for (ServiceComponentHost sch : hostComponents.values()) { + if (!sch.canBeRemoved()) { + throw new AmbariException("Found non removable hostcomponent " + " when trying to delete" + + " all hostcomponents from servicecomponent" + ", clusterName=" + getClusterName() + + ", serviceName=" + getServiceName() + ", componentName=" + getName() + + ", recoveryEnabled=" + isRecoveryEnabled() + ", hostname=" + sch.getHostName()); } + } - hostComponents.clear(); - } finally { - readWriteLock.writeLock().unlock(); + for (ServiceComponentHost serviceComponentHost : hostComponents.values()) { + serviceComponentHost.delete(); } + + hostComponents.clear(); } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void deleteServiceComponentHosts(String hostname) throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - ServiceComponentHost sch = getServiceComponentHost(hostname); - LOG.info("Deleting servicecomponenthost for cluster" + ServiceComponentHost sch = getServiceComponentHost(hostname); + LOG.info("Deleting servicecomponenthost for cluster" + ", clusterName=" + getClusterName() + + ", serviceName=" + getServiceName() + ", componentName=" + getName() + + ", recoveryEnabled=" + isRecoveryEnabled() + ", hostname=" + sch.getHostName()); + if (!sch.canBeRemoved()) { + throw new AmbariException("Could not delete hostcomponent from cluster" + ", clusterName=" + getClusterName() + ", serviceName=" + getServiceName() + ", componentName=" + getName() + ", recoveryEnabled=" + isRecoveryEnabled() + ", hostname=" + sch.getHostName()); - if (!sch.canBeRemoved()) { - throw new AmbariException("Could not delete hostcomponent from cluster" - + ", clusterName=" + getClusterName() - + ", serviceName=" + getServiceName() - + ", componentName=" + getName() - + ", recoveryEnabled=" + isRecoveryEnabled() - + ", hostname=" + sch.getHostName()); - } - sch.delete(); - hostComponents.remove(hostname); - - } finally { - readWriteLock.writeLock().unlock(); } + sch.delete(); + hostComponents.remove(hostname); + } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @Override @Transactional + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void delete() throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - deleteAllServiceComponentHosts(); + deleteAllServiceComponentHosts(); - if (persisted) { - removeEntities(); - persisted = false; - } - } finally { - readWriteLock.writeLock().unlock(); + if (persisted) { + removeEntities(); + persisted = false; } } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java index 3120b86..36d4902 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java @@ -18,12 +18,15 @@ package org.apache.ambari.server.state; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.ProvisionException; -import com.google.inject.assistedinject.Assisted; -import com.google.inject.assistedinject.AssistedInject; -import com.google.inject.persist.Transactional; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.api.services.AmbariMetaInfo; @@ -43,7 +46,6 @@ import org.apache.ambari.server.orm.entities.ClusterConfigMappingEntity; import org.apache.ambari.server.orm.entities.ClusterEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntityPK; -import org.apache.ambari.server.orm.entities.ConfigGroupEntity; import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.ServiceConfigEntity; import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity; @@ -51,19 +53,16 @@ import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntityPK; import org.apache.ambari.server.orm.entities.StackEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.ProvisionException; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import com.google.inject.persist.Transactional; public class ServiceImpl implements Service { - private final ReadWriteLock clusterGlobalLock; private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // Cached entity has only 1 getter for name private ClusterServiceEntity serviceEntity; @@ -113,7 +112,6 @@ public class ServiceImpl implements Service { public ServiceImpl(@Assisted Cluster cluster, @Assisted String serviceName, Injector injector) throws AmbariException { injector.injectMembers(this); - clusterGlobalLock = cluster.getClusterGlobalLock(); serviceEntity = new ClusterServiceEntity(); serviceEntity.setClusterId(cluster.getClusterId()); serviceEntity.setServiceName(serviceName); @@ -145,7 +143,6 @@ public class ServiceImpl implements Service { public ServiceImpl(@Assisted Cluster cluster, @Assisted ClusterServiceEntity serviceEntity, Injector injector) throws AmbariException { injector.injectMembers(this); - clusterGlobalLock = cluster.getClusterGlobalLock(); this.serviceEntity = serviceEntity; this.cluster = cluster; @@ -182,11 +179,6 @@ public class ServiceImpl implements Service { } @Override - public ReadWriteLock getClusterGlobalLock() { - return clusterGlobalLock; - } - - @Override public String getName() { return serviceEntity.getServiceName(); } @@ -207,83 +199,35 @@ public class ServiceImpl implements Service { } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void addServiceComponents( Map<String, ServiceComponent> components) throws AmbariException { - clusterGlobalLock.writeLock().lock(); - try { - readWriteLock.writeLock().lock(); - try { - for (ServiceComponent sc : components.values()) { - addServiceComponent(sc); - } - } finally { - readWriteLock.writeLock().unlock(); - } - } finally { - clusterGlobalLock.writeLock().unlock(); + for (ServiceComponent sc : components.values()) { + addServiceComponent(sc); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void addServiceComponent(ServiceComponent component) throws AmbariException { - clusterGlobalLock.writeLock().lock(); - try { - readWriteLock.writeLock().lock(); - try { - // TODO validation - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a ServiceComponent to Service" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + getName() - + ", serviceComponentName=" + component.getName()); - } - if (components.containsKey(component.getName())) { - throw new AmbariException("Cannot add duplicate ServiceComponent" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + getName() - + ", serviceComponentName=" + component.getName()); - } - components.put(component.getName(), component); - } finally { - readWriteLock.writeLock().unlock(); - } - } finally { - clusterGlobalLock.writeLock().unlock(); + if (components.containsKey(component.getName())) { + throw new AmbariException("Cannot add duplicate ServiceComponent" + + ", clusterName=" + cluster.getClusterName() + + ", clusterId=" + cluster.getClusterId() + + ", serviceName=" + getName() + + ", serviceComponentName=" + component.getName()); } + + components.put(component.getName(), component); } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public ServiceComponent addServiceComponent(String serviceComponentName) throws AmbariException { - clusterGlobalLock.writeLock().lock(); - try { - readWriteLock.writeLock().lock(); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a ServiceComponent to Service" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + getName() - + ", serviceComponentName=" + serviceComponentName); - } - if (components.containsKey(serviceComponentName)) { - throw new AmbariException("Cannot add duplicate ServiceComponent" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + getName() - + ", serviceComponentName=" + serviceComponentName); - } - ServiceComponent component = serviceComponentFactory.createNew(this, serviceComponentName); - components.put(component.getName(), component); - return component; - } finally { - readWriteLock.writeLock().unlock(); - } - } finally { - clusterGlobalLock.writeLock().unlock(); - } + ServiceComponent component = serviceComponentFactory.createNew(this, serviceComponentName); + addServiceComponent(component); + return component; } @Override @@ -460,36 +404,30 @@ public class ServiceImpl implements Service { * transaction is not necessary before this calling this method. */ @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void persist() { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - if (!persisted) { - persistEntities(); - refresh(); - // There refresh calls are no longer needed with cached references - // not used on getters/setters - // cluster.refresh(); - persisted = true; - - // publish the service installed event - StackId stackId = cluster.getDesiredStackVersion(); - cluster.addService(this); - - ServiceInstalledEvent event = new ServiceInstalledEvent( - getClusterId(), stackId.getStackName(), - stackId.getStackVersion(), getName()); - - eventPublisher.publish(event); - } else { - saveIfPersisted(); - } - } finally { - readWriteLock.writeLock().unlock(); + if (!persisted) { + persistEntities(); + refresh(); + + persisted = true; + + // publish the service installed event + StackId stackId = cluster.getDesiredStackVersion(); + cluster.addService(this); + + ServiceInstalledEvent event = new ServiceInstalledEvent( + getClusterId(), stackId.getStackName(), + stackId.getStackVersion(), getName()); + + eventPublisher.publish(event); + } else { + saveIfPersisted(); } } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @@ -535,31 +473,26 @@ public class ServiceImpl implements Service { } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public boolean canBeRemoved() { - clusterGlobalLock.readLock().lock(); + readWriteLock.readLock().lock(); try { - readWriteLock.readLock().lock(); - try { - // - // A service can be deleted if all it's components - // can be removed, irrespective of the state of - // the service itself. - // - for (ServiceComponent sc : components.values()) { - if (!sc.canBeRemoved()) { - LOG.warn("Found non removable component when trying to delete service" - + ", clusterName=" + cluster.getClusterName() - + ", serviceName=" + getName() - + ", componentName=" + sc.getName()); - return false; - } + // + // A service can be deleted if all it's components + // can be removed, irrespective of the state of + // the service itself. + // + for (ServiceComponent sc : components.values()) { + if (!sc.canBeRemoved()) { + LOG.warn("Found non removable component when trying to delete service" + ", clusterName=" + + cluster.getClusterName() + ", serviceName=" + getName() + ", componentName=" + + sc.getName()); + return false; } - return true; - } finally { - readWriteLock.readLock().unlock(); } + return true; } finally { - clusterGlobalLock.readLock().unlock(); + readWriteLock.readLock().unlock(); } } @@ -599,71 +532,56 @@ public class ServiceImpl implements Service { serviceConfigDAO.remove(serviceConfigEntity); } } - + @Override @Transactional + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void deleteAllComponents() throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - LOG.info("Deleting all components for service" - + ", clusterName=" + cluster.getClusterName() - + ", serviceName=" + getName()); - // FIXME check dependencies from meta layer - for (ServiceComponent component : components.values()) { - if (!component.canBeRemoved()) { - throw new AmbariException("Found non removable component when trying to" - + " delete all components from service" - + ", clusterName=" + cluster.getClusterName() - + ", serviceName=" + getName() - + ", componentName=" + component.getName()); - } - } - - for (ServiceComponent serviceComponent : components.values()) { - serviceComponent.delete(); + LOG.info("Deleting all components for service" + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + getName()); + // FIXME check dependencies from meta layer + for (ServiceComponent component : components.values()) { + if (!component.canBeRemoved()) { + throw new AmbariException("Found non removable component when trying to" + + " delete all components from service" + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + getName() + ", componentName=" + component.getName()); } + } - components.clear(); - } finally { - readWriteLock.writeLock().unlock(); + for (ServiceComponent serviceComponent : components.values()) { + serviceComponent.delete(); } + + components.clear(); } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void deleteServiceComponent(String componentName) throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - ServiceComponent component = getServiceComponent(componentName); - LOG.info("Deleting servicecomponent for cluster" + ServiceComponent component = getServiceComponent(componentName); + LOG.info("Deleting servicecomponent for cluster" + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + getName() + ", componentName=" + componentName); + // FIXME check dependencies from meta layer + if (!component.canBeRemoved()) { + throw new AmbariException("Could not delete component from cluster" + ", clusterName=" + cluster.getClusterName() + ", serviceName=" + getName() + ", componentName=" + componentName); - // FIXME check dependencies from meta layer - if (!component.canBeRemoved()) { - throw new AmbariException("Could not delete component from cluster" - + ", clusterName=" + cluster.getClusterName() - + ", serviceName=" + getName() - + ", componentName=" + componentName); - } - - component.delete(); - components.remove(componentName); - } finally { - readWriteLock.writeLock().unlock(); } + + component.delete(); + components.remove(componentName); } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } - - } @Override @@ -673,34 +591,28 @@ public class ServiceImpl implements Service { @Override @Transactional + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void delete() throws AmbariException { - clusterGlobalLock.writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - deleteAllComponents(); - deleteAllServiceConfigs(); + deleteAllComponents(); + deleteAllServiceConfigs(); - if (persisted) { - removeEntities(); - persisted = false; + if (persisted) { + removeEntities(); + persisted = false; - // publish the service removed event - StackId stackId = cluster.getDesiredStackVersion(); + // publish the service removed event + StackId stackId = cluster.getDesiredStackVersion(); - ServiceRemovedEvent event = new ServiceRemovedEvent(getClusterId(), - stackId.getStackName(), stackId.getStackVersion(), getName()); + ServiceRemovedEvent event = new ServiceRemovedEvent(getClusterId(), stackId.getStackName(), + stackId.getStackVersion(), getName()); - eventPublisher.publish(event); - } - } finally { - readWriteLock.writeLock().unlock(); + eventPublisher.publish(event); } } finally { - clusterGlobalLock.writeLock().unlock(); + readWriteLock.writeLock().unlock(); } - - } @Transactional http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 2f7d6b9..a6f0a3b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -336,12 +336,6 @@ public class ClusterImpl implements Cluster { this.eventPublisher = eventPublisher; } - - @Override - public ReadWriteLock getClusterGlobalLock() { - return clusterGlobalLock; - } - private void loadServiceConfigTypes() throws AmbariException { try { serviceConfigTypes = collectServiceConfigTypesMapping(); http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java index 1d6b1e8..9917720 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java @@ -44,7 +44,6 @@ import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; -import org.apache.ambari.server.state.ConfigFactory; import org.apache.ambari.server.state.Host; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,8 +79,6 @@ public class ConfigGroupImpl implements ConfigGroup { private ClusterDAO clusterDAO; @Inject Clusters clusters; - @Inject - private ConfigFactory configFactory; @AssistedInject public ConfigGroupImpl(@Assisted("cluster") Cluster cluster, @@ -317,23 +314,18 @@ public class ConfigGroupImpl implements ConfigGroup { @Override public void persist() { - cluster.getClusterGlobalLock().writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - if (!isPersisted) { - persistEntities(); - refresh(); - cluster.refresh(); - isPersisted = true; - } else { - saveIfPersisted(); - } - } finally { - readWriteLock.writeLock().unlock(); + if (!isPersisted) { + persistEntities(); + refresh(); + cluster.refresh(); + isPersisted = true; + } else { + saveIfPersisted(); } } finally { - cluster.getClusterGlobalLock().writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @@ -465,20 +457,15 @@ public class ConfigGroupImpl implements ConfigGroup { @Override public void delete() { - cluster.getClusterGlobalLock().writeLock().lock(); + readWriteLock.writeLock().lock(); try { - readWriteLock.writeLock().lock(); - try { - configGroupConfigMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupHostMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); - configGroupDAO.removeByPK(configGroupEntity.getGroupId()); - cluster.refresh(); - isPersisted = false; - } finally { - readWriteLock.writeLock().unlock(); - } + configGroupConfigMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); + configGroupHostMappingDAO.removeAllByGroup(configGroupEntity.getGroupId()); + configGroupDAO.removeByPK(configGroupEntity.getGroupId()); + cluster.refresh(); + isPersisted = false; } finally { - cluster.getClusterGlobalLock().writeLock().unlock(); + readWriteLock.writeLock().unlock(); } } @@ -526,40 +513,33 @@ public class ConfigGroupImpl implements ConfigGroup { @Override public ConfigGroupResponse convertToResponse() throws AmbariException { - cluster.getClusterGlobalLock().readLock().lock(); + readWriteLock.readLock().lock(); try { - readWriteLock.readLock().lock(); - try { - Set<Map<String, Object>> hostnames = new HashSet<Map<String, Object>>(); - for (Host host : hosts.values()) { - Map<String, Object> hostMap = new HashMap<String, Object>(); - hostMap.put("host_name", host.getHostName()); - hostnames.add(hostMap); - } + Set<Map<String, Object>> hostnames = new HashSet<Map<String, Object>>(); + for (Host host : hosts.values()) { + Map<String, Object> hostMap = new HashMap<String, Object>(); + hostMap.put("host_name", host.getHostName()); + hostnames.add(hostMap); + } - Set<Map<String, Object>> configObjMap = new HashSet<Map<String, - Object>>(); + Set<Map<String, Object>> configObjMap = new HashSet<Map<String, Object>>(); - for (Config config : configurations.values()) { - Map<String, Object> configMap = new HashMap<String, Object>(); - configMap.put(ConfigurationResourceProvider - .CONFIGURATION_CONFIG_TYPE_PROPERTY_ID, config.getType()); - configMap.put(ConfigurationResourceProvider - .CONFIGURATION_CONFIG_TAG_PROPERTY_ID, config.getTag()); - configObjMap.add(configMap); - } + for (Config config : configurations.values()) { + Map<String, Object> configMap = new HashMap<String, Object>(); + configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TYPE_PROPERTY_ID, + config.getType()); + configMap.put(ConfigurationResourceProvider.CONFIGURATION_CONFIG_TAG_PROPERTY_ID, + config.getTag()); + configObjMap.add(configMap); + } - ConfigGroupResponse configGroupResponse = new ConfigGroupResponse( + ConfigGroupResponse configGroupResponse = new ConfigGroupResponse( configGroupEntity.getGroupId(), cluster.getClusterName(), configGroupEntity.getGroupName(), configGroupEntity.getTag(), - configGroupEntity.getDescription(), - hostnames, configObjMap); - return configGroupResponse; - } finally { - readWriteLock.readLock().unlock(); - } + configGroupEntity.getDescription(), hostnames, configObjMap); + return configGroupResponse; } finally { - cluster.getClusterGlobalLock().readLock().unlock(); + readWriteLock.readLock().unlock(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java index 3b5ed28..7e345e5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java @@ -30,6 +30,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.AlertDefinitionCommand; import org.apache.ambari.server.api.services.AmbariMetaInfo; @@ -92,7 +94,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { private static final Logger LOG = LoggerFactory.getLogger(ServiceComponentHostImpl.class); - private final ReadWriteLock clusterGlobalLock; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock readLock = readWriteLock.readLock(); private final Lock writeLock = readWriteLock.writeLock(); @@ -751,7 +752,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { } this.serviceComponent = serviceComponent; - clusterGlobalLock = serviceComponent.getClusterGlobalLock(); HostEntity hostEntity = null; try { @@ -805,7 +805,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { Injector injector) { injector.injectMembers(this); this.serviceComponent = serviceComponent; - clusterGlobalLock = serviceComponent.getClusterGlobalLock(); this.desiredStateEntity = desiredStateEntity; this.stateEntity = stateEntity; @@ -1029,6 +1028,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void handleEvent(ServiceComponentHostEvent event) throws InvalidStateTransitionException { if (LOG.isDebugEnabled()) { @@ -1037,30 +1037,25 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { + ", event=" + event.toString()); } State oldState = getState(); - clusterGlobalLock.readLock().lock(); try { + writeLock.lock(); try { - writeLock.lock(); - try { - stateMachine.doTransition(event.getType(), event); - getStateEntity().setCurrentState(stateMachine.getCurrentState()); - saveComponentStateEntityIfPersisted(); - // TODO Audit logs - } catch (InvalidStateTransitionException e) { - LOG.error("Can't handle ServiceComponentHostEvent event at" - + " current state" - + ", serviceComponentName=" + getServiceComponentName() - + ", hostName=" + getHostName() - + ", currentState=" + oldState - + ", eventType=" + event.getType() - + ", event=" + event); - throw e; - } - } finally { - writeLock.unlock(); + stateMachine.doTransition(event.getType(), event); + getStateEntity().setCurrentState(stateMachine.getCurrentState()); + saveComponentStateEntityIfPersisted(); + // TODO Audit logs + } catch (InvalidStateTransitionException e) { + LOG.error("Can't handle ServiceComponentHostEvent event at" + + " current state" + + ", serviceComponentName=" + getServiceComponentName() + + ", hostName=" + getHostName() + + ", currentState=" + oldState + + ", eventType=" + event.getType() + + ", event=" + event); + throw e; } } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } if (!oldState.equals(getState())) { @@ -1349,58 +1344,56 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public ServiceComponentHostResponse convertToResponse(Map<String, DesiredConfig> desiredConfigs) { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - HostComponentStateEntity hostComponentStateEntity = getStateEntity(); - if (null == hostComponentStateEntity) { - LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted."); - return null; - } - - String clusterName = serviceComponent.getClusterName(); - String serviceName = serviceComponent.getServiceName(); - String serviceComponentName = serviceComponent.getName(); - String hostName = getHostName(); - String state = getState().toString(); - String stackId = getStackVersion().getStackId(); - String desiredState = getDesiredState().toString(); - String desiredStackId = getDesiredStackVersion().getStackId(); - HostComponentAdminState componentAdminState = getComponentAdminState(); - UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState(); - - String displayName = null; - try { - ComponentInfo compInfo = ambariMetaInfo.getComponent(getStackVersion().getStackName(), - getStackVersion().getStackVersion(), serviceName, serviceComponentName); - displayName = compInfo.getDisplayName(); - } catch (AmbariException e) { - displayName = serviceComponentName; - } + HostComponentStateEntity hostComponentStateEntity = getStateEntity(); + if (null == hostComponentStateEntity) { + LOG.warn( + "Could not convert ServiceComponentHostResponse to a response. It's possible that Host {} was deleted.", + getHostName()); + return null; + } - ServiceComponentHostResponse r = new ServiceComponentHostResponse( - clusterName, serviceName, - serviceComponentName, displayName, hostName, state, - stackId, desiredState, - desiredStackId, componentAdminState); + String clusterName = serviceComponent.getClusterName(); + String serviceName = serviceComponent.getServiceName(); + String serviceComponentName = serviceComponent.getName(); + String hostName = getHostName(); + String state = getState().toString(); + String stackId = getStackVersion().getStackId(); + String desiredState = getDesiredState().toString(); + String desiredStackId = getDesiredStackVersion().getStackId(); + HostComponentAdminState componentAdminState = getComponentAdminState(); + UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState(); + + String displayName = null; + try { + ComponentInfo compInfo = ambariMetaInfo.getComponent(getStackVersion().getStackName(), + getStackVersion().getStackVersion(), serviceName, serviceComponentName); + displayName = compInfo.getDisplayName(); + } catch (AmbariException e) { + displayName = serviceComponentName; + } - r.setActualConfigs(actualConfigs); - r.setUpgradeState(upgradeState); + ServiceComponentHostResponse r = new ServiceComponentHostResponse( + clusterName, serviceName, + serviceComponentName, displayName, hostName, state, + stackId, desiredState, + desiredStackId, componentAdminState); - try { - r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs)); - } catch (Exception e) { - LOG.error("Could not determine stale config", e); - } + r.setActualConfigs(actualConfigs); + r.setUpgradeState(upgradeState); - return r; - } finally { - readLock.unlock(); + try { + r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs)); + } catch (Exception e) { + LOG.error("Could not determine stale config", e); } + + return r; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @@ -1448,52 +1441,29 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { */ @Override public void persist() { - boolean clusterWriteLockAcquired = false; - if (!persisted) { - clusterGlobalLock.writeLock().lock(); - clusterWriteLockAcquired = true; - } - + writeLock.lock(); try { - writeLock.lock(); - try { - if (!persisted) { - // persist the new cluster topology and then release the cluster lock - // as it has no more bearing on the rest of this persist() method - persistEntities(); - persisted = true; - - clusterGlobalLock.writeLock().unlock(); - clusterWriteLockAcquired = false; - - // these should still be done with the internal lock - refresh(); - // There refresh calls are no longer needed with cached references - // not used on getters/setters - // NOTE: Refreshing parents is a bad pattern. - //host.refresh(); - //serviceComponent.refresh(); - - // publish the service component installed event - StackId stackId = getDesiredStackVersion(); - - ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent( - getClusterId(), stackId.getStackName(), - stackId.getStackVersion(), getServiceName(), getServiceComponentName(), getHostName(), - isRecoveryEnabled()); - - eventPublisher.publish(event); - } else { - saveComponentStateEntityIfPersisted(); - saveComponentDesiredStateEntityIfPersisted(); - } - } finally { - writeLock.unlock(); + if (!persisted) { + // persist the new cluster topology + persistEntities(); + persisted = true; + + refresh(); + + // publish the service component installed event + StackId stackId = getDesiredStackVersion(); + + ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(getClusterId(), + stackId.getStackName(), stackId.getStackVersion(), getServiceName(), + getServiceComponentName(), getHostName(), isRecoveryEnabled()); + + eventPublisher.publish(event); + } else { + saveComponentStateEntityIfPersisted(); + saveComponentDesiredStateEntityIfPersisted(); } } finally { - if (clusterWriteLockAcquired) { - clusterGlobalLock.writeLock().unlock(); - } + writeLock.unlock(); } } @@ -1568,8 +1538,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public boolean canBeRemoved() { - clusterGlobalLock.readLock().lock(); boolean schLockAcquired = false; try { // if unable to read, then writers are writing; cannot remove SCH @@ -1581,38 +1551,33 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { if (schLockAcquired) { readLock.unlock(); } - clusterGlobalLock.readLock().unlock(); } } @Override + @Experimental(feature = ExperimentalFeature.CLUSTER_GLOBAL_LOCK_REMOVAL) public void delete() { boolean fireRemovalEvent = false; - clusterGlobalLock.writeLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - if (persisted) { - removeEntities(); - - // host must be re-loaded from db to refresh the cached JPA HostEntity - // that references HostComponentDesiredStateEntity - // and HostComponentStateEntity JPA entities - host.refresh(); + if (persisted) { + removeEntities(); - persisted = false; - fireRemovalEvent = true; - } + // host must be re-loaded from db to refresh the cached JPA HostEntity + // that references HostComponentDesiredStateEntity + // and HostComponentStateEntity JPA entities + host.refresh(); - clusters.getCluster(getClusterName()).removeServiceComponentHost(this); - } catch (AmbariException ex) { - LOG.error("Unable to remove a service component from a host", ex); - } finally { - writeLock.unlock(); + persisted = false; + fireRemovalEvent = true; } + + clusters.getCluster(getClusterName()).removeServiceComponentHost(this); + } catch (AmbariException ex) { + LOG.error("Unable to remove a service component from a host", ex); } finally { - clusterGlobalLock.writeLock().unlock(); + writeLock.unlock(); } // publish event for the removal of the SCH after the removal is http://git-wip-us.apache.org/repos/asf/ambari/blob/561c6f2f/ambari-server/src/test/java/org/apache/ambari/server/update/HostUpdateHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/update/HostUpdateHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/update/HostUpdateHelperTest.java index 387205d..f9dd5d1 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/update/HostUpdateHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/update/HostUpdateHelperTest.java @@ -18,12 +18,18 @@ package org.apache.ambari.server.update; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import junit.framework.Assert; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.persistence.EntityManager; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -51,18 +57,13 @@ import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Test; -import javax.persistence.EntityManager; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; +import junit.framework.Assert; public class HostUpdateHelperTest { @@ -217,8 +218,6 @@ public class HostUpdateHelperTest { ClusterConfigEntity mockClusterConfigEntity3 = easyMockSupport.createNiceMock(ClusterConfigEntity.class); ClusterConfigEntity mockClusterConfigEntity4 = easyMockSupport.createNiceMock(ClusterConfigEntity.class); StackEntity mockStackEntity = easyMockSupport.createNiceMock(StackEntity.class); - ReadWriteLock mockReadWriteLock = easyMockSupport.createNiceMock(ReadWriteLock.class); - Lock mockLock = easyMockSupport.createNiceMock(Lock.class); Map<String, Map<String, String>> clusterHostsToChange = new HashMap<>(); Map<String, String> hosts = new HashMap<>(); List<ClusterConfigEntity> clusterConfigEntities1 = new ArrayList<>(); @@ -254,11 +253,8 @@ public class HostUpdateHelperTest { expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); expect(mockClusters.getCluster("cl1")).andReturn(mockCluster).once(); - expect(mockCluster.getClusterGlobalLock()).andReturn(mockReadWriteLock).atLeastOnce(); expect(mockCluster.getClusterId()).andReturn(1L).atLeastOnce(); - expect(mockReadWriteLock.writeLock()).andReturn(mockLock).atLeastOnce(); - expect(mockClusterEntity1.getClusterConfigEntities()).andReturn(clusterConfigEntities1).atLeastOnce(); expect(mockClusterEntity2.getClusterConfigEntities()).andReturn(clusterConfigEntities2).atLeastOnce();
