Repository: ambari Updated Branches: refs/heads/trunk 86dc9394c -> 23d506e2a
http://git-wip-us.apache.org/repos/asf/ambari/blob/23d506e2/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java index dbdef2b..55a6ce7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -47,26 +46,22 @@ import org.apache.ambari.server.orm.dao.HostDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO; -import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntityPK; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntityPK; import org.apache.ambari.server.orm.entities.HostEntity; -import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.HostComponentAdminState; import org.apache.ambari.server.state.HostConfig; import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.MaintenanceState; -import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.SecurityState; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; @@ -83,7 +78,6 @@ import org.apache.ambari.server.state.fsm.SingleArcTransition; import org.apache.ambari.server.state.fsm.StateMachine; import org.apache.ambari.server.state.fsm.StateMachineFactory; import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -639,58 +633,45 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { } } - private void resetLastOpInfo() { - clusterGlobalLock.readLock().lock(); try { - try { - writeLock.lock(); - setLastOpStartTime(-1); - setLastOpLastUpdateTime(-1); - setLastOpEndTime(-1); - } finally { - writeLock.unlock(); - } + writeLock.lock(); + setLastOpStartTime(-1); + setLastOpLastUpdateTime(-1); + setLastOpEndTime(-1); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } - } private void updateLastOpInfo(ServiceComponentHostEventType eventType, long time) { - clusterGlobalLock.readLock().lock(); try { - try { - writeLock.lock(); - switch (eventType) { - case HOST_SVCCOMP_INSTALL: - case HOST_SVCCOMP_START: - case HOST_SVCCOMP_STOP: - case HOST_SVCCOMP_UNINSTALL: - case HOST_SVCCOMP_WIPEOUT: - case HOST_SVCCOMP_OP_RESTART: - resetLastOpInfo(); - setLastOpStartTime(time); - break; - case HOST_SVCCOMP_OP_FAILED: - case HOST_SVCCOMP_OP_SUCCEEDED: - case HOST_SVCCOMP_STOPPED: - case HOST_SVCCOMP_STARTED: - setLastOpLastUpdateTime(time); - setLastOpEndTime(time); - break; - case HOST_SVCCOMP_OP_IN_PROGRESS: - setLastOpLastUpdateTime(time); - break; - } - } finally { - writeLock.unlock(); + writeLock.lock(); + switch (eventType) { + case HOST_SVCCOMP_INSTALL: + case HOST_SVCCOMP_START: + case HOST_SVCCOMP_STOP: + case HOST_SVCCOMP_UNINSTALL: + case HOST_SVCCOMP_WIPEOUT: + case HOST_SVCCOMP_OP_RESTART: + resetLastOpInfo(); + setLastOpStartTime(time); + break; + case HOST_SVCCOMP_OP_FAILED: + case HOST_SVCCOMP_OP_SUCCEEDED: + case HOST_SVCCOMP_STOPPED: + case HOST_SVCCOMP_STARTED: + setLastOpLastUpdateTime(time); + setLastOpEndTime(time); + break; + case HOST_SVCCOMP_OP_IN_PROGRESS: + setLastOpLastUpdateTime(time); + break; } } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } - } @AssistedInject @@ -777,16 +758,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { public State getState() { clusterGlobalLock.readLock().lock(); try { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } + // there's no reason to lock around the state machine for this SCH since + // the state machine is synchronized + return stateMachine.getCurrentState(); } finally { clusterGlobalLock.readLock().unlock(); } - } @Override @@ -808,78 +785,53 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override public String getVersion() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return stateEntity.getVersion(); - } finally { - readLock.unlock(); - } + return stateEntity.getVersion(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setVersion(String version) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - stateEntity.setVersion(version); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + stateEntity.setVersion(version); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public SecurityState getSecurityState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return stateEntity.getSecurityState(); - } finally { - readLock.unlock(); - } + return stateEntity.getSecurityState(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setSecurityState(SecurityState securityState) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - stateEntity.setSecurityState(securityState); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + stateEntity.setSecurityState(securityState); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public SecurityState getDesiredSecurityState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return desiredStateEntity.getSecurityState(); - } finally { - readLock.unlock(); - } + return desiredStateEntity.getSecurityState(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @@ -889,17 +841,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { throw new AmbariException("The security state must be an endpoint state"); } - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setSecurityState(securityState); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + desiredStateEntity.setSecurityState(securityState); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @@ -913,17 +860,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { */ @Override public void setUpgradeState(UpgradeState upgradeState) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - stateEntity.setUpgradeState(upgradeState); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + stateEntity.setUpgradeState(upgradeState); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @@ -978,48 +920,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override public String getServiceComponentName() { - clusterGlobalLock.readLock().lock(); - try { - readLock.lock(); - try { - return serviceComponent.getName(); - } finally { - readLock.unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } + return serviceComponent.getName(); } @Override public String getHostName() { - clusterGlobalLock.readLock().lock(); - try { - readLock.lock(); - try { - return host.getHostName(); - } finally { - readLock.unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } + return host.getHostName(); } /** * @return the lastOpStartTime */ public long getLastOpStartTime() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return lastOpStartTime; - } finally { - readLock.unlock(); - } + return lastOpStartTime; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @@ -1027,16 +944,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { * @param lastOpStartTime the lastOpStartTime to set */ public void setLastOpStartTime(long lastOpStartTime) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - this.lastOpStartTime = lastOpStartTime; - } finally { - writeLock.unlock(); - } + this.lastOpStartTime = lastOpStartTime; } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @@ -1044,16 +956,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { * @return the lastOpEndTime */ public long getLastOpEndTime() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return lastOpEndTime; - } finally { - readLock.unlock(); - } + return lastOpEndTime; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @@ -1061,16 +968,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { * @param lastOpEndTime the lastOpEndTime to set */ public void setLastOpEndTime(long lastOpEndTime) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - this.lastOpEndTime = lastOpEndTime; - } finally { - writeLock.unlock(); - } + this.lastOpEndTime = lastOpEndTime; } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @@ -1078,16 +980,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { * @return the lastOpLastUpdateTime */ public long getLastOpLastUpdateTime() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return lastOpLastUpdateTime; - } finally { - readLock.unlock(); - } + return lastOpLastUpdateTime; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @@ -1095,252 +992,160 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { * @param lastOpLastUpdateTime the lastOpLastUpdateTime to set */ public void setLastOpLastUpdateTime(long lastOpLastUpdateTime) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - this.lastOpLastUpdateTime = lastOpLastUpdateTime; - } finally { - writeLock.unlock(); - } + this.lastOpLastUpdateTime = lastOpLastUpdateTime; } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public long getClusterId() { - clusterGlobalLock.readLock().lock(); - try { - readLock.lock(); - try { - return serviceComponent.getClusterId(); - } finally { - readLock.unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } + return serviceComponent.getClusterId(); } @Override public String getServiceName() { - clusterGlobalLock.readLock().lock(); - try { - readLock.lock(); - try { - return serviceComponent.getServiceName(); - } finally { - readLock.unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } + return serviceComponent.getServiceName(); } @Override public StackId getStackVersion() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class); - } finally { - readLock.unlock(); - } + return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setStackVersion(StackId stackVersion) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - stateEntity.setCurrentStackVersion(gson.toJson(stackVersion)); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + stateEntity.setCurrentStackVersion(gson.toJson(stackVersion)); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public State getDesiredState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return desiredStateEntity.getDesiredState(); - } finally { - readLock.unlock(); - } + return desiredStateEntity.getDesiredState(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setDesiredState(State state) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setDesiredState(state); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + desiredStateEntity.setDesiredState(state); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public StackId getDesiredStackVersion() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class); - } finally { - readLock.unlock(); - } + return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), + StackId.class); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setDesiredStackVersion(StackId stackVersion) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion)); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion)); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public HostComponentAdminState getComponentAdminState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - HostComponentAdminState adminState = desiredStateEntity.getAdminState(); - if (adminState == null - && !serviceComponent.isClientComponent() && !serviceComponent.isMasterComponent()) { - adminState = HostComponentAdminState.INSERVICE; - } - return adminState; - } finally { - readLock.unlock(); + HostComponentAdminState adminState = desiredStateEntity.getAdminState(); + if (adminState == null && !serviceComponent.isClientComponent() + && !serviceComponent.isMasterComponent()) { + adminState = HostComponentAdminState.INSERVICE; } + return adminState; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setComponentAdminState(HostComponentAdminState attribute) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setAdminState(attribute); - saveIfPersisted(); - } finally { - writeLock.unlock(); - } + desiredStateEntity.setAdminState(attribute); + saveIfPersisted(); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public ServiceComponentHostResponse convertToResponse() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - ServiceComponentHostResponse r = new ServiceComponentHostResponse( - serviceComponent.getClusterName(), - serviceComponent.getServiceName(), - serviceComponent.getName(), - getHostName(), - getState().toString(), - getStackVersion().getStackId(), - getDesiredState().toString(), - getDesiredStackVersion().getStackId(), - getComponentAdminState()); - - r.setActualConfigs(actualConfigs); + ServiceComponentHostResponse r = new ServiceComponentHostResponse( + serviceComponent.getClusterName(), serviceComponent.getServiceName(), + serviceComponent.getName(), getHostName(), getState().toString(), + getStackVersion().getStackId(), getDesiredState().toString(), + getDesiredStackVersion().getStackId(), getComponentAdminState()); - try { - r.setStaleConfig(helper.isStaleConfigs(this)); - } catch (Exception e) { - LOG.error("Could not determine stale config", e); - } + r.setActualConfigs(actualConfigs); - return r; - } finally { - readLock.unlock(); + try { + r.setStaleConfig(helper.isStaleConfigs(this)); + } catch (Exception e) { + LOG.error("Could not determine stale config", e); } + + return r; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public String getClusterName() { - clusterGlobalLock.readLock().lock(); - try { - readLock.lock(); - try { - return serviceComponent.getClusterName(); - } finally { - readLock.unlock(); - } - } finally { - clusterGlobalLock.readLock().unlock(); - } - + return serviceComponent.getClusterName(); } @Override public void debugDump(StringBuilder sb) { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - sb.append("ServiceComponentHost={ hostname=").append(getHostName()) - .append(", serviceComponentName=").append(serviceComponent.getName()) - .append(", clusterName=").append(serviceComponent.getClusterName()) - .append(", serviceName=").append(serviceComponent.getServiceName()) - .append(", desiredStackVersion=").append(getDesiredStackVersion()) - .append(", desiredState=").append(getDesiredState()) - .append(", stackVersion=").append(getStackVersion()) - .append(", state=").append(getState()) - .append(", securityState=").append(getSecurityState()) - .append(", desiredSecurityState=").append(getDesiredSecurityState()) - .append(" }"); - } finally { - readLock.unlock(); - } + sb.append("ServiceComponentHost={ hostname=").append(getHostName()).append( + ", serviceComponentName=").append(serviceComponent.getName()).append( + ", clusterName=").append(serviceComponent.getClusterName()).append( + ", serviceName=").append(serviceComponent.getServiceName()).append( + ", desiredStackVersion=").append(getDesiredStackVersion()).append( + ", desiredState=").append(getDesiredState()).append(", stackVersion=").append( + getStackVersion()).append(", state=").append(getState()).append( + ", securityState=").append(getSecurityState()).append( + ", desiredSecurityState=").append(getDesiredSecurityState()).append( + " }"); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } - } @Override @@ -1361,12 +1166,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override public void persist() { - clusterGlobalLock.readLock().lock(); + boolean clusterWriteLockAcquired = false; + if (!persisted) { + clusterGlobalLock.writeLock().lock(); + clusterWriteLockAcquired = true; + } + try { writeLock.lock(); try { if (!persisted) { + // persist the new cluster topology and then release the cluster lock + // as it has no more bearing on the rest of this persist() method persistEntities(); + clusterGlobalLock.writeLock().unlock(); + clusterWriteLockAcquired = false; + + // these shoudl still be done with the internal lock refresh(); host.refresh(); serviceComponent.refresh(); @@ -1387,9 +1203,10 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { writeLock.unlock(); } } finally { - clusterGlobalLock.readLock().unlock(); + if (clusterWriteLockAcquired) { + clusterGlobalLock.writeLock().unlock(); + } } - } @Transactional @@ -1421,33 +1238,27 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override @Transactional public void refresh() { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - if (isPersisted()) { - HostComponentStateEntityPK pk = new HostComponentStateEntityPK(); - HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK(); - pk.setClusterId(getClusterId()); - pk.setComponentName(getServiceComponentName()); - pk.setServiceName(getServiceName()); - pk.setHostName(getHostName()); - dpk.setClusterId(getClusterId()); - dpk.setComponentName(getServiceComponentName()); - dpk.setServiceName(getServiceName()); - dpk.setHostName(getHostName()); - stateEntity = hostComponentStateDAO.findByPK(pk); - desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk); - hostComponentStateDAO.refresh(stateEntity); - hostComponentDesiredStateDAO.refresh(desiredStateEntity); - } - } finally { - writeLock.unlock(); + if (isPersisted()) { + HostComponentStateEntityPK pk = new HostComponentStateEntityPK(); + HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK(); + pk.setClusterId(getClusterId()); + pk.setComponentName(getServiceComponentName()); + pk.setServiceName(getServiceName()); + pk.setHostName(getHostName()); + dpk.setClusterId(getClusterId()); + dpk.setComponentName(getServiceComponentName()); + dpk.setServiceName(getServiceName()); + dpk.setHostName(getHostName()); + stateEntity = hostComponentStateDAO.findByPK(pk); + desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk); + hostComponentStateDAO.refresh(stateEntity); + hostComponentDesiredStateDAO.refresh(desiredStateEntity); } } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } - } @Transactional @@ -1461,21 +1272,24 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { @Override public boolean canBeRemoved() { clusterGlobalLock.readLock().lock(); + boolean schLockAcquired = false; try { - readLock.lock(); - try { - - return (getState().isRemovableState()); + // if unable to read, then writers are writing; cannot remove SCH + schLockAcquired = readLock.tryLock(); + if (!schLockAcquired) { + return false; + } - } finally { + return (getState().isRemovableState()); + } finally { + if (schLockAcquired) { readLock.unlock(); } - } finally { clusterGlobalLock.readLock().unlock(); } - } + @Override public void delete() { boolean fireRemovalEvent = false; @@ -1550,170 +1364,123 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { return; } - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - LOG.debug("Updating actual config tags: " + configTags); - actualConfigs = new HashMap<String, HostConfig>(); - - for (Entry<String, Map<String, String>> entry : configTags.entrySet()) { - String type = entry.getKey(); - Map<String, String> values = new HashMap<String, String>(entry.getValue()); - - String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG); - values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG); - - HostConfig hc = new HostConfig(); - hc.setDefaultVersionTag(tag); - actualConfigs.put(type, hc); - - if (!values.isEmpty()) { - for (Entry<String, String> overrideEntry : values.entrySet()) { - Long groupId = Long.parseLong(overrideEntry.getKey()); - hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue()); - if (!configGroupMap.containsKey(groupId)) { - LOG.debug("Config group does not exist, id = " + groupId); - } + LOG.debug("Updating actual config tags: " + configTags); + actualConfigs = new HashMap<String, HostConfig>(); + + for (Entry<String, Map<String, String>> entry : configTags.entrySet()) { + String type = entry.getKey(); + Map<String, String> values = new HashMap<String, String>( + entry.getValue()); + + String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG); + values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG); + + HostConfig hc = new HostConfig(); + hc.setDefaultVersionTag(tag); + actualConfigs.put(type, hc); + + if (!values.isEmpty()) { + for (Entry<String, String> overrideEntry : values.entrySet()) { + Long groupId = Long.parseLong(overrideEntry.getKey()); + hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue()); + if (!configGroupMap.containsKey(groupId)) { + LOG.debug("Config group does not exist, id = " + groupId); } } } - } finally { - writeLock.unlock(); } } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } - - @Override public Map<String, HostConfig> getActualConfigs() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return actualConfigs; - } finally { - readLock.unlock(); - } + return actualConfigs; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } - } @Override public HostState getHostState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return host.getState(); - } finally { - readLock.unlock(); - } + return host.getState(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setMaintenanceState(MaintenanceState state) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setMaintenanceState(state); - saveIfPersisted(); + desiredStateEntity.setMaintenanceState(state); + saveIfPersisted(); - // broadcast the maintenance mode change - MaintenanceModeEvent event = new MaintenanceModeEvent(state, this); - eventPublisher.publish(event); + // broadcast the maintenance mode change + MaintenanceModeEvent event = new MaintenanceModeEvent(state, this); + eventPublisher.publish(event); - } finally { - writeLock.unlock(); - } } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public MaintenanceState getMaintenanceState() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return desiredStateEntity.getMaintenanceState(); - } finally { - readLock.unlock(); - } + return desiredStateEntity.getMaintenanceState(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setProcesses(List<Map<String, String>> procs) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - processes = Collections.unmodifiableList(procs); - } finally { - writeLock.unlock(); - } + processes = Collections.unmodifiableList(procs); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } @Override public List<Map<String, String>> getProcesses() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return processes; - } finally { - readLock.unlock(); - } + return processes; } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public boolean isRestartRequired() { - clusterGlobalLock.readLock().lock(); + readLock.lock(); try { - readLock.lock(); - try { - return desiredStateEntity.isRestartRequired(); - } finally { - readLock.unlock(); - } + return desiredStateEntity.isRestartRequired(); } finally { - clusterGlobalLock.readLock().unlock(); + readLock.unlock(); } } @Override public void setRestartRequired(boolean restartRequired) { - clusterGlobalLock.readLock().lock(); + writeLock.lock(); try { - writeLock.lock(); - try { - desiredStateEntity.setRestartRequired(restartRequired); - saveIfPersisted(); - helper.invalidateStaleConfigsCache(this); - } finally { - writeLock.unlock(); - } + desiredStateEntity.setRestartRequired(restartRequired); + saveIfPersisted(); + helper.invalidateStaleConfigsCache(this); } finally { - clusterGlobalLock.readLock().unlock(); + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/23d506e2/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java new file mode 100644 index 0000000..7f9248b --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java @@ -0,0 +1,363 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ambari.server.state.cluster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.ServiceComponentNotFoundException; +import org.apache.ambari.server.ServiceNotFoundException; +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.orm.GuiceJpaInitializer; +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.OrmTestHelper; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.RepositoryVersionState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentFactory; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.ServiceFactory; +import org.apache.ambari.server.state.StackId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.persist.PersistService; + +/** + * Tests AMBARI-9368 which produced a deadlock during read and writes of some of + * the impl classes. + */ +public class ClusterDeadlockTest { + private final AtomicInteger hostNameCounter = new AtomicInteger(0); + + @Inject + private Injector injector; + + @Inject + private Clusters clusters; + + @Inject + private ServiceFactory serviceFactory; + + @Inject + private ServiceComponentFactory serviceComponentFactory; + + @Inject + private ServiceComponentHostFactory serviceComponentHostFactory; + + @Inject + private AmbariMetaInfo metaInfo; + + @Inject + private OrmTestHelper helper; + + @Before + public void setup() throws Exception { + injector = Guice.createInjector(new InMemoryDefaultTestModule()); + injector.getInstance(GuiceJpaInitializer.class); + injector.injectMembers(this); + clusters.addCluster("c1"); + + StackId stackId = new StackId("HDP-0.1"); + Cluster c1 = clusters.getCluster("c1"); + c1.setDesiredStackVersion(stackId); + helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion()); + c1.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING); + metaInfo.init(); + + // 100 hosts + for (int i = 0; i < 100; i++) { + String hostName = "c64-" + i; + clusters.addHost(hostName); + setOsFamily(clusters.getHost(hostName), "redhat", "6.4"); + clusters.getHost(hostName).persist(); + clusters.mapHostToCluster(hostName, "c1"); + } + + // force creation of the service and the components on the last host + createNewServiceComponentHost("HDFS", "NAMENODE", "c64-99", false); + createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-99", true); + } + + @After + public void teardown() { + injector.getInstance(PersistService.class).stop(); + } + + /** + * Tests that concurrent impl serialization and impl writing doesn't cause a + * deadlock. + * + * @throws Exception + */ + @Test(timeout = 15000) + public void testDeadlockBetweenImplementations() throws Exception { + Cluster cluster = clusters.getCluster("c1"); + Service service = cluster.getService("HDFS"); + ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE"); + ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT"); + + ServiceComponentHost namenodeSCH = createNewServiceComponentHost("HDFS", + "NAMENODE", "c64-0", false); + + ServiceComponentHost hdfsClientSCH = createNewServiceComponentHost("HDFS", + "HDFS_CLIENT", "c64-0", true); + + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 5; i++) { + DeadlockExerciserThread thread = new DeadlockExerciserThread(); + thread.setCluster(cluster); + thread.setService(service); + thread.setHdfsClientComponent(hdfsClientComponent); + thread.setNamenodeComponent(namenodeComponent); + thread.setNamenodeSCH(namenodeSCH); + thread.setHdfsClientSCH(hdfsClientSCH); + thread.start(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + } + + /** + * Tests that while serializing a service component, writes to that service + * component do not cause a deadlock with the global cluster lock. + * + * @throws Exception + */ + @Test(timeout = 15000) + public void testAddingHostComponentsWhileReading() throws Exception { + Cluster cluster = clusters.getCluster("c1"); + Service service = cluster.getService("HDFS"); + ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE"); + ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT"); + + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 5; i++) { + ServiceComponentDeadlockThread thread = new ServiceComponentDeadlockThread(); + thread.setHdfsClientComponent(hdfsClientComponent); + thread.setNamenodeComponent(namenodeComponent); + thread.start(); + threads.add(thread); + } + + for (Thread thread : threads) { + thread.join(); + } + } + + /** + * Tests AMBARI-9368 which saw a deadlock when adding a service component host + * while reading a service component. + */ + private final class ServiceComponentDeadlockThread extends Thread { + private ServiceComponent namenodeComponent; + private ServiceComponent hdfsClientComponent; + + /** + * @param namenodeComponent + * the namenodeComponent to set + */ + public void setNamenodeComponent(ServiceComponent namenodeComponent) { + this.namenodeComponent = namenodeComponent; + } + + /** + * @param hdfsClientComponent + * the hdfsClientComponent to set + */ + public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) { + this.hdfsClientComponent = hdfsClientComponent; + } + + /** + * {@inheritDoc} + */ + @Override + public void run() { + try { + for (int i = 0; i < 15; i++) { + int hostNumeric = hostNameCounter.getAndIncrement(); + + namenodeComponent.convertToResponse(); + createNewServiceComponentHost("HDFS", "NAMENODE", "c64-" + + hostNumeric, false); + + hdfsClientComponent.convertToResponse(); + createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-" + + hostNumeric, true); + + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + /** + * Tests AMBARI-9368 which produced a deadlock during read and writes of some + * of the impl classes. + */ + private static final class DeadlockExerciserThread extends Thread { + private Cluster cluster; + private Service service; + private ServiceComponent namenodeComponent; + private ServiceComponent hdfsClientComponent; + private ServiceComponentHost namenodeSCH; + private ServiceComponentHost hdfsClientSCH; + + /** + * @param cluster + * the cluster to set + */ + public void setCluster(Cluster cluster) { + this.cluster = cluster; + } + + /** + * @param service + * the service to set + */ + public void setService(Service service) { + this.service = service; + } + + /** + * @param namenodeComponent + * the namenodeComponent to set + */ + public void setNamenodeComponent(ServiceComponent namenodeComponent) { + this.namenodeComponent = namenodeComponent; + } + + /** + * @param hdfsClientComponent + * the hdfsClientComponent to set + */ + public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) { + this.hdfsClientComponent = hdfsClientComponent; + } + + /** + * @param namenodeSCH + * the namenodeSCH to set + */ + public void setNamenodeSCH(ServiceComponentHost namenodeSCH) { + this.namenodeSCH = namenodeSCH; + } + + /** + * @param hdfsClientSCH + * the hdfsClientSCH to set + */ + public void setHdfsClientSCH(ServiceComponentHost hdfsClientSCH) { + this.hdfsClientSCH = hdfsClientSCH; + } + + /** + * {@inheritDoc} + */ + @Override + public void run() { + try { + for (int i = 0; i < 100; i++) { + cluster.convertToResponse(); + service.convertToResponse(); + namenodeComponent.convertToResponse(); + hdfsClientComponent.convertToResponse(); + namenodeSCH.convertToResponse(); + hdfsClientSCH.convertToResponse(); + + cluster.setProvisioningState(org.apache.ambari.server.state.State.INIT); + service.setMaintenanceState(MaintenanceState.OFF); + namenodeComponent.setDesiredState(org.apache.ambari.server.state.State.STARTED); + hdfsClientComponent.setDesiredState(org.apache.ambari.server.state.State.INSTALLED); + + namenodeSCH.setState(org.apache.ambari.server.state.State.STARTED); + hdfsClientSCH.setState(org.apache.ambari.server.state.State.INSTALLED); + + Thread.sleep(10); + } + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + } + + private void setOsFamily(Host host, String osFamily, String osVersion) { + Map<String, String> hostAttributes = new HashMap<String, String>(2); + hostAttributes.put("os_family", osFamily); + hostAttributes.put("os_release_version", osVersion); + host.setHostAttributes(hostAttributes); + } + + private ServiceComponentHost createNewServiceComponentHost(String svc, + String svcComponent, String hostName, boolean isClient) + throws AmbariException { + Cluster c = clusters.getCluster("c1"); + Assert.assertNotNull(c.getConfigGroups()); + return createNewServiceComponentHost(c, svc, svcComponent, hostName); + } + + private ServiceComponentHost createNewServiceComponentHost(Cluster c, + String svc, String svcComponent, String hostName) throws AmbariException { + + Service s = null; + + try { + s = c.getService(svc); + } catch (ServiceNotFoundException e) { + s = serviceFactory.createNew(c, svc); + c.addService(s); + s.persist(); + } + + ServiceComponent sc = null; + try { + sc = s.getServiceComponent(svcComponent); + } catch (ServiceComponentNotFoundException e) { + sc = serviceComponentFactory.createNew(s, svcComponent); + s.addServiceComponent(sc); + sc.persist(); + } + + ServiceComponentHost impl = serviceComponentHostFactory.createNew(sc, + hostName); + + impl.persist(); + return impl; + } +}
