Modified: incubator/ambari/branches/branch-1.2.4/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/branches/branch-1.2.4/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1489556&r1=1489555&r2=1489556&view=diff ============================================================================== --- incubator/ambari/branches/branch-1.2.4/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java (original) +++ incubator/ambari/branches/branch-1.2.4/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java Tue Jun 4 18:46:41 2013 @@ -64,8 +64,10 @@ public class ServiceComponentHostImpl im // FIXME need more debug logs - private final Lock readLock; - private final Lock writeLock; + private final ReadWriteLock clusterGlobalLock; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); private final ServiceComponent serviceComponent; private final Host host; @@ -547,46 +549,56 @@ public class ServiceComponentHostImpl im private void resetLastOpInfo() { + clusterGlobalLock.readLock().lock(); try { - writeLock.lock(); - setLastOpStartTime(-1); - setLastOpLastUpdateTime(-1); - setLastOpEndTime(-1); - } - finally { - writeLock.unlock(); + try { + writeLock.lock(); + setLastOpStartTime(-1); + setLastOpLastUpdateTime(-1); + setLastOpEndTime(-1); + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } private void updateLastOpInfo(ServiceComponentHostEventType eventType, long time) { + clusterGlobalLock.readLock().lock(); try { - writeLock.lock(); - switch (eventType) { - case HOST_SVCCOMP_INSTALL: - case HOST_SVCCOMP_START: - case HOST_SVCCOMP_STOP: - case HOST_SVCCOMP_UNINSTALL: - case HOST_SVCCOMP_WIPEOUT: - case HOST_SVCCOMP_OP_RESTART: - resetLastOpInfo(); - setLastOpStartTime(time); - break; - case HOST_SVCCOMP_OP_FAILED: - case HOST_SVCCOMP_OP_SUCCEEDED: - case HOST_SVCCOMP_STOPPED: - case HOST_SVCCOMP_STARTED: - setLastOpLastUpdateTime(time); - setLastOpEndTime(time); - break; - case HOST_SVCCOMP_OP_IN_PROGRESS: - setLastOpLastUpdateTime(time); - break; + try { + writeLock.lock(); + switch (eventType) { + case HOST_SVCCOMP_INSTALL: + case HOST_SVCCOMP_START: + case HOST_SVCCOMP_STOP: + case HOST_SVCCOMP_UNINSTALL: + case HOST_SVCCOMP_WIPEOUT: + case HOST_SVCCOMP_OP_RESTART: + resetLastOpInfo(); + setLastOpStartTime(time); + break; + case HOST_SVCCOMP_OP_FAILED: + case HOST_SVCCOMP_OP_SUCCEEDED: + case HOST_SVCCOMP_STOPPED: + case HOST_SVCCOMP_STARTED: + setLastOpLastUpdateTime(time); + setLastOpEndTime(time); + break; + case HOST_SVCCOMP_OP_IN_PROGRESS: + setLastOpLastUpdateTime(time); + break; + } + } finally { + writeLock.unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - finally { - writeLock.unlock(); - } + } @AssistedInject @@ -600,10 +612,8 @@ public class ServiceComponentHostImpl im this.stateMachine = daemonStateMachineFactory.make(this); } - ReadWriteLock rwLock = new ReentrantReadWriteLock(); - this.readLock = rwLock.readLock(); - this.writeLock = rwLock.writeLock(); this.serviceComponent = serviceComponent; + this.clusterGlobalLock = serviceComponent.getClusterGlobalLock(); stateEntity = new HostComponentStateEntity(); stateEntity.setClusterId(serviceComponent.getClusterId()); @@ -641,11 +651,8 @@ public class ServiceComponentHostImpl im @Assisted HostComponentDesiredStateEntity desiredStateEntity, Injector injector) { injector.injectMembers(this); - ReadWriteLock rwLock = new ReentrantReadWriteLock(); - this.readLock = rwLock.readLock(); - this.writeLock = rwLock.writeLock(); this.serviceComponent = serviceComponent; - + this.clusterGlobalLock = serviceComponent.getClusterGlobalLock(); this.desiredStateEntity = desiredStateEntity; this.stateEntity = stateEntity; @@ -678,26 +685,36 @@ public class ServiceComponentHostImpl im @Override public State getState() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return stateMachine.getCurrentState(); - } - finally { - readLock.unlock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public void setState(State state) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - stateMachine.setCurrentState(state); - stateEntity.setCurrentState(state); - saveIfPersisted(); - } - finally { - writeLock.unlock(); + try { + stateMachine.setCurrentState(state); + stateEntity.setCurrentState(state); + saveIfPersisted(); + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override @@ -710,27 +727,32 @@ public class ServiceComponentHostImpl im + ", event=" + event.toString()); } State oldState = getState(); + clusterGlobalLock.readLock().lock(); try { - writeLock.lock(); try { - stateMachine.doTransition(event.getType(), event); - stateEntity.setCurrentState(stateMachine.getCurrentState()); - saveIfPersisted(); - // TODO Audit logs - } catch (InvalidStateTransitionException e) { - LOG.error("Can't handle ServiceComponentHostEvent event at" - + " current state" - + ", serviceComponentName=" + this.getServiceComponentName() - + ", hostName=" + this.getHostName() - + ", currentState=" + oldState - + ", eventType=" + event.getType() - + ", event=" + event); - throw e; + writeLock.lock(); + try { + stateMachine.doTransition(event.getType(), event); + stateEntity.setCurrentState(stateMachine.getCurrentState()); + saveIfPersisted(); + // TODO Audit logs + } catch (InvalidStateTransitionException e) { + LOG.error("Can't handle ServiceComponentHostEvent event at" + + " current state" + + ", serviceComponentName=" + this.getServiceComponentName() + + ", hostName=" + this.getHostName() + + ", currentState=" + oldState + + ", eventType=" + event.getType() + + ", event=" + event); + throw e; + } + } finally { + writeLock.unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - finally { - writeLock.unlock(); - } + if (!oldState.equals(getState())) { if (LOG.isDebugEnabled()) { LOG.debug("ServiceComponentHost transitioned to a new state" @@ -746,158 +768,276 @@ public class ServiceComponentHostImpl im @Override public String getServiceComponentName() { - return serviceComponent.getName(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return serviceComponent.getName(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + + } @Override public String getHostName() { - return host.getHostName(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return host.getHostName(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + + } /** * @return the lastOpStartTime */ public long getLastOpStartTime() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return lastOpStartTime; - } - finally { - readLock.unlock(); + try { + return lastOpStartTime; + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } /** * @param lastOpStartTime the lastOpStartTime to set */ public void setLastOpStartTime(long lastOpStartTime) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - this.lastOpStartTime = lastOpStartTime; - } - finally { - writeLock.unlock(); + try { + this.lastOpStartTime = lastOpStartTime; + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } /** * @return the lastOpEndTime */ public long getLastOpEndTime() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return lastOpEndTime; - } - finally { - readLock.unlock(); + try { + return lastOpEndTime; + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } /** * @param lastOpEndTime the lastOpEndTime to set */ public void setLastOpEndTime(long lastOpEndTime) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - this.lastOpEndTime = lastOpEndTime; - } - finally { - writeLock.unlock(); + try { + this.lastOpEndTime = lastOpEndTime; + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } /** * @return the lastOpLastUpdateTime */ public long getLastOpLastUpdateTime() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return lastOpLastUpdateTime; - } - finally { - readLock.unlock(); + try { + return lastOpLastUpdateTime; + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } /** * @param lastOpLastUpdateTime the lastOpLastUpdateTime to set */ public void setLastOpLastUpdateTime(long lastOpLastUpdateTime) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - this.lastOpLastUpdateTime = lastOpLastUpdateTime; - } - finally { - writeLock.unlock(); + try { + this.lastOpLastUpdateTime = lastOpLastUpdateTime; + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public long getClusterId() { - return serviceComponent.getClusterId(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return serviceComponent.getClusterId(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + + } @Override public String getServiceName() { - return serviceComponent.getServiceName(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return serviceComponent.getServiceName(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + + } Map<String, String> getConfigVersions() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - if (this.configs != null) { - return Collections.unmodifiableMap(configs); - } else { - return new HashMap<String, String>(); + try { + if (this.configs != null) { + return Collections.unmodifiableMap(configs); + } else { + return new HashMap<String, String>(); + } + } finally { + readLock.unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - finally { - readLock.unlock(); - } + } @Override public Map<String, Config> getConfigs() throws AmbariException { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - Map<String, Config> map = new HashMap<String, Config>(); - Cluster cluster = clusters.getClusterById(getClusterId()); - for (Entry<String, String> entry : configs.entrySet()) { - Config config = cluster.getConfig( - entry.getKey(), entry.getValue()); - if (null != config) { - map.put(entry.getKey(), config); + try { + Map<String, Config> map = new HashMap<String, Config>(); + Cluster cluster = clusters.getClusterById(getClusterId()); + for (Entry<String, String> entry : configs.entrySet()) { + Config config = cluster.getConfig( + entry.getKey(), entry.getValue()); + if (null != config) { + map.put(entry.getKey(), config); + } } + return map; + } finally { + readLock.unlock(); } - return map; - } - finally { - readLock.unlock(); + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Transactional void setConfigs(Map<String, String> configs) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); + try { - Set<String> deletedTypes = new HashSet<String>(); - for (String type : this.configs.keySet()) { - if (!configs.containsKey(type)) { - deletedTypes.add(type); + Set<String> deletedTypes = new HashSet<String>(); + for (String type : this.configs.keySet()) { + if (!configs.containsKey(type)) { + deletedTypes.add(type); + } } - } - long now = Long.valueOf(new java.util.Date().getTime()); + long now = Long.valueOf(new Date().getTime()); - for (Entry<String,String> entry : configs.entrySet()) { + for (Entry<String, String> entry : configs.entrySet()) { + + boolean contains = false; + for (HostComponentConfigMappingEntity mappingEntity : stateEntity.getHostComponentConfigMappingEntities()) { + if (entry.getKey().equals(mappingEntity.getConfigType())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating live config to ServiceComponentHost" + + ", clusterId=" + stateEntity.getClusterId() + + ", serviceName=" + stateEntity.getServiceName() + + ", componentName=" + stateEntity.getComponentName() + + ", hostname=" + stateEntity.getHostName() + + ", configType=" + entry.getKey() + + ", configVersionTag=" + entry.getValue()); + } + contains = true; + mappingEntity.setVersionTag(entry.getValue()); + mappingEntity.setTimestamp(now); + hostComponentConfigMappingDAO.merge(mappingEntity); + break; + } + } + + if (!contains) { + HostComponentConfigMappingEntity newEntity = + new HostComponentConfigMappingEntity(); + newEntity.setClusterId(stateEntity.getClusterId()); + newEntity.setServiceName(stateEntity.getServiceName()); + newEntity.setComponentName(stateEntity.getComponentName()); + newEntity.setHostName(stateEntity.getHostName()); + newEntity.setConfigType(entry.getKey()); + newEntity.setVersionTag(entry.getValue()); + newEntity.setTimestamp(now); - boolean contains = false; - for (HostComponentConfigMappingEntity mappingEntity : stateEntity.getHostComponentConfigMappingEntities()) { - if (entry.getKey().equals(mappingEntity.getConfigType())) { if (LOG.isDebugEnabled()) { - LOG.debug("Updating live config to ServiceComponentHost" + LOG.debug("Adding new live config to ServiceComponentHost" + ", clusterId=" + stateEntity.getClusterId() + ", serviceName=" + stateEntity.getServiceName() + ", componentName=" + stateEntity.getComponentName() @@ -905,306 +1045,359 @@ public class ServiceComponentHostImpl im + ", configType=" + entry.getKey() + ", configVersionTag=" + entry.getValue()); } - contains = true; - mappingEntity.setVersionTag(entry.getValue()); - mappingEntity.setTimestamp(now); - hostComponentConfigMappingDAO.merge(mappingEntity); - break; - } - } - - if (!contains) { - HostComponentConfigMappingEntity newEntity = - new HostComponentConfigMappingEntity(); - newEntity.setClusterId(stateEntity.getClusterId()); - newEntity.setServiceName(stateEntity.getServiceName()); - newEntity.setComponentName(stateEntity.getComponentName()); - newEntity.setHostName(stateEntity.getHostName()); - newEntity.setConfigType(entry.getKey()); - newEntity.setVersionTag(entry.getValue()); - newEntity.setTimestamp(now); - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new live config to ServiceComponentHost" - + ", clusterId=" + stateEntity.getClusterId() - + ", serviceName=" + stateEntity.getServiceName() - + ", componentName=" + stateEntity.getComponentName() - + ", hostname=" + stateEntity.getHostName() - + ", configType=" + entry.getKey() - + ", configVersionTag=" + entry.getValue()); + stateEntity.getHostComponentConfigMappingEntities().add(newEntity); + newEntity.setHostComponentStateEntity(stateEntity); + hostComponentConfigMappingDAO.create(newEntity); } - stateEntity.getHostComponentConfigMappingEntities().add(newEntity); - newEntity.setHostComponentStateEntity(stateEntity); - hostComponentConfigMappingDAO.create(newEntity); } - } - if (!deletedTypes.isEmpty()) { - List<HostComponentConfigMappingEntity> deleteEntities = - hostComponentConfigMappingDAO.findByHostComponentAndType( - stateEntity.getClusterId(), stateEntity.getServiceName(), - stateEntity.getComponentName(), - stateEntity.getHostName(), deletedTypes); - for (HostComponentConfigMappingEntity deleteEntity : deleteEntities) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting live config to ServiceComponentHost" - + ", clusterId=" + stateEntity.getClusterId() - + ", serviceName=" + stateEntity.getServiceName() - + ", componentName=" + stateEntity.getComponentName() - + ", hostname=" + stateEntity.getHostName() - + ", configType=" + deleteEntity.getConfigType() - + ", configVersionTag=" + deleteEntity.getVersionTag()); - } - stateEntity.getHostComponentConfigMappingEntities().remove( - deleteEntity); - if (persisted) { - hostComponentConfigMappingDAO.remove(deleteEntity); + if (!deletedTypes.isEmpty()) { + List<HostComponentConfigMappingEntity> deleteEntities = + hostComponentConfigMappingDAO.findByHostComponentAndType( + stateEntity.getClusterId(), stateEntity.getServiceName(), + stateEntity.getComponentName(), + stateEntity.getHostName(), deletedTypes); + for (HostComponentConfigMappingEntity deleteEntity : deleteEntities) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting live config to ServiceComponentHost" + + ", clusterId=" + stateEntity.getClusterId() + + ", serviceName=" + stateEntity.getServiceName() + + ", componentName=" + stateEntity.getComponentName() + + ", hostname=" + stateEntity.getHostName() + + ", configType=" + deleteEntity.getConfigType() + + ", configVersionTag=" + deleteEntity.getVersionTag()); + } + stateEntity.getHostComponentConfigMappingEntities().remove( + deleteEntity); + if (persisted) { + hostComponentConfigMappingDAO.remove(deleteEntity); + } } } + this.configs = configs; + saveIfPersisted(); + } finally { + writeLock.unlock(); } - this.configs = configs; - saveIfPersisted(); } finally { - writeLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public StackId getStackVersion() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class); - } - finally { - readLock.unlock(); + try { + return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public void setStackVersion(StackId stackVersion) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - stateEntity.setCurrentStackVersion(gson.toJson(stackVersion)); - saveIfPersisted(); - } - finally { - writeLock.unlock(); + try { + stateEntity.setCurrentStackVersion(gson.toJson(stackVersion)); + saveIfPersisted(); + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public State getDesiredState() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return desiredStateEntity.getDesiredState(); - } - finally { - readLock.unlock(); + try { + return desiredStateEntity.getDesiredState(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public void setDesiredState(State state) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - desiredStateEntity.setDesiredState(state); - saveIfPersisted(); - } - finally { - writeLock.unlock(); + try { + desiredStateEntity.setDesiredState(state); + saveIfPersisted(); + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public Map<String, String> getDesiredConfigVersionsRecursive() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - Map<String, String> fullDesiredConfigVersions = - new HashMap<String, String>(); - Map<String, Config> desiredConfs = getDesiredConfigs(); - for (Config c : desiredConfs.values()) { - fullDesiredConfigVersions.put(c.getType(), c.getVersionTag()); + try { + Map<String, String> fullDesiredConfigVersions = + new HashMap<String, String>(); + Map<String, Config> desiredConfs = getDesiredConfigs(); + for (Config c : desiredConfs.values()) { + fullDesiredConfigVersions.put(c.getType(), c.getVersionTag()); + } + return fullDesiredConfigVersions; + } finally { + readLock.unlock(); } - return fullDesiredConfigVersions; - } - finally { - readLock.unlock(); + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public Map<String, Config> getDesiredConfigs() { - Map<String, Config> map = new HashMap<String, Config>(); + clusterGlobalLock.readLock().lock(); try { + Map<String, Config> map = new HashMap<String, Config>(); readLock.lock(); - for (Entry<String, String> entry : desiredConfigs.entrySet()) { - Config config = clusters.getClusterById(getClusterId()).getConfig( - entry.getKey(), entry.getValue()); - if (null != config) { - map.put(entry.getKey(), config); + try { + for (Entry<String, String> entry : desiredConfigs.entrySet()) { + Config config = clusters.getClusterById(getClusterId()).getConfig( + entry.getKey(), entry.getValue()); + if (null != config) { + map.put(entry.getKey(), config); + } } + } catch (AmbariException e) { + // TODO do something + return null; + } finally { + readLock.unlock(); } - } - catch (AmbariException e) { - // TODO do something - return null; - } - finally { - readLock.unlock(); - } - // do a union with component level configs - Map<String, Config> compConfigs = serviceComponent.getDesiredConfigs(); - for (Entry<String, Config> entry : compConfigs.entrySet()) { - if (!map.containsKey(entry.getKey())) { - map.put(entry.getKey(), entry.getValue()); + // do a union with component level configs + Map<String, Config> compConfigs = serviceComponent.getDesiredConfigs(); + for (Entry<String, Config> entry : compConfigs.entrySet()) { + if (!map.containsKey(entry.getKey())) { + map.put(entry.getKey(), entry.getValue()); + } } + return Collections.unmodifiableMap(map); + } finally { + clusterGlobalLock.readLock().unlock(); } - return Collections.unmodifiableMap(map); + } @Override @Transactional public void updateDesiredConfigs(Map<String, Config> configs) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); + try { - for (Entry<String,Config> entry : configs.entrySet()) { + for (Entry<String, Config> entry : configs.entrySet()) { - boolean contains = false; - for (HostComponentDesiredConfigMappingEntity desiredConfigMappingEntity : desiredStateEntity.getHostComponentDesiredConfigMappingEntities()) { - if (entry.getKey().equals(desiredConfigMappingEntity.getConfigType())) { - contains = true; - desiredConfigMappingEntity.setVersionTag(entry.getValue().getVersionTag()); - desiredConfigMappingEntity.setTimestamp(new Date().getTime()); - hostComponentDesiredConfigMappingDAO.merge(desiredConfigMappingEntity); - break; + boolean contains = false; + for (HostComponentDesiredConfigMappingEntity desiredConfigMappingEntity : desiredStateEntity.getHostComponentDesiredConfigMappingEntities()) { + if (entry.getKey().equals(desiredConfigMappingEntity.getConfigType())) { + contains = true; + desiredConfigMappingEntity.setVersionTag(entry.getValue().getVersionTag()); + desiredConfigMappingEntity.setTimestamp(new Date().getTime()); + hostComponentDesiredConfigMappingDAO.merge(desiredConfigMappingEntity); + break; + } + } + + if (!contains) { + HostComponentDesiredConfigMappingEntity newEntity = new HostComponentDesiredConfigMappingEntity(); + newEntity.setClusterId(desiredStateEntity.getClusterId()); + newEntity.setServiceName(desiredStateEntity.getServiceName()); + newEntity.setComponentName(desiredStateEntity.getComponentName()); + newEntity.setHostName(desiredStateEntity.getHostName()); + newEntity.setConfigType(entry.getKey()); + newEntity.setVersionTag(entry.getValue().getVersionTag()); + newEntity.setTimestamp(new Date().getTime()); + newEntity.setHostComponentDesiredStateEntity(desiredStateEntity); + desiredStateEntity.getHostComponentDesiredConfigMappingEntities().add(newEntity); + hostComponentDesiredConfigMappingDAO.create(newEntity); } - } - if (!contains) { - HostComponentDesiredConfigMappingEntity newEntity = new HostComponentDesiredConfigMappingEntity(); - newEntity.setClusterId(desiredStateEntity.getClusterId()); - newEntity.setServiceName(desiredStateEntity.getServiceName()); - newEntity.setComponentName(desiredStateEntity.getComponentName()); - newEntity.setHostName(desiredStateEntity.getHostName()); - newEntity.setConfigType(entry.getKey()); - newEntity.setVersionTag(entry.getValue().getVersionTag()); - newEntity.setTimestamp(new Date().getTime()); - newEntity.setHostComponentDesiredStateEntity(desiredStateEntity); - desiredStateEntity.getHostComponentDesiredConfigMappingEntities().add(newEntity); - hostComponentDesiredConfigMappingDAO.create(newEntity); + this.desiredConfigs.put(entry.getKey(), entry.getValue().getVersionTag()); } - this.desiredConfigs.put(entry.getKey(), entry.getValue().getVersionTag()); + saveIfPersisted(); + } finally { + writeLock.unlock(); } - - saveIfPersisted(); - } - finally { - writeLock.unlock(); + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public StackId getDesiredStackVersion() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class); - } - finally { - readLock.unlock(); + try { + return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public void setDesiredStackVersion(StackId stackVersion) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion)); - saveIfPersisted(); - } - finally { - writeLock.unlock(); + try { + desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion)); + saveIfPersisted(); + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public ServiceComponentHostResponse convertToResponse() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - ServiceComponentHostResponse r = new ServiceComponentHostResponse( - serviceComponent.getClusterName(), - serviceComponent.getServiceName(), - serviceComponent.getName(), - getHostName(), - configs, - desiredConfigs, - getState().toString(), - getStackVersion().getStackId(), - getDesiredState().toString(), - getDesiredStackVersion().getStackId()); - - r.setHa_status(ha_status); - r.setActualConfigs(actualConfigs); - return r; - } - finally { - readLock.unlock(); + try { + ServiceComponentHostResponse r = new ServiceComponentHostResponse( + serviceComponent.getClusterName(), + serviceComponent.getServiceName(), + serviceComponent.getName(), + getHostName(), + configs, + desiredConfigs, + getState().toString(), + getStackVersion().getStackId(), + getDesiredState().toString(), + getDesiredStackVersion().getStackId()); + + r.setHa_status(ha_status); + r.setActualConfigs(actualConfigs); + return r; + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public String getClusterName() { - return serviceComponent.getClusterName(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return serviceComponent.getClusterName(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + } @Override public void debugDump(StringBuilder sb) { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - sb.append("ServiceComponentHost={ hostname=" + getHostName() - + ", serviceComponentName=" + serviceComponent.getName() - + ", clusterName=" + serviceComponent.getClusterName() - + ", serviceName=" + serviceComponent.getServiceName() - + ", desiredStackVersion=" + getDesiredStackVersion() - + ", desiredState=" + getDesiredState() - + ", stackVersion=" + getStackVersion() - + ", state=" + getState() - + " }"); - } - finally { - readLock.unlock(); + try { + sb.append("ServiceComponentHost={ hostname=" + getHostName() + + ", serviceComponentName=" + serviceComponent.getName() + + ", clusterName=" + serviceComponent.getClusterName() + + ", serviceName=" + serviceComponent.getServiceName() + + ", desiredStackVersion=" + getDesiredStackVersion() + + ", desiredState=" + getDesiredState() + + ", stackVersion=" + getStackVersion() + + ", state=" + getState() + + " }"); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public boolean isPersisted() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); - return persisted; + try { + return persisted; + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void persist() { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - if (!persisted) { - persistEntities(); - refresh(); - host.refresh(); - serviceComponent.refresh(); - persisted = true; - } else { - saveIfPersisted(); + try { + if (!persisted) { + persistEntities(); + refresh(); + host.refresh(); + serviceComponent.refresh(); + persisted = true; + } else { + saveIfPersisted(); + } + } finally { + writeLock.unlock(); } } finally { - writeLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Transactional @@ -1235,23 +1428,34 @@ public class ServiceComponentHostImpl im @Override @Transactional - public synchronized void refresh() { - if (isPersisted()) { - HostComponentStateEntityPK pk = new HostComponentStateEntityPK(); - HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK(); - pk.setClusterId(getClusterId()); - pk.setComponentName(getServiceComponentName()); - pk.setServiceName(getServiceName()); - pk.setHostName(getHostName()); - dpk.setClusterId(getClusterId()); - dpk.setComponentName(getServiceComponentName()); - dpk.setServiceName(getServiceName()); - dpk.setHostName(getHostName()); - stateEntity = hostComponentStateDAO.findByPK(pk); - desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk); - hostComponentStateDAO.refresh(stateEntity); - hostComponentDesiredStateDAO.refresh(desiredStateEntity); + public void refresh() { + clusterGlobalLock.readLock().lock(); + try { + writeLock.lock(); + try { + if (isPersisted()) { + HostComponentStateEntityPK pk = new HostComponentStateEntityPK(); + HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK(); + pk.setClusterId(getClusterId()); + pk.setComponentName(getServiceComponentName()); + pk.setServiceName(getServiceName()); + pk.setHostName(getHostName()); + dpk.setClusterId(getClusterId()); + dpk.setComponentName(getServiceComponentName()); + dpk.setServiceName(getServiceName()); + dpk.setHostName(getHostName()); + stateEntity = hostComponentStateDAO.findByPK(pk); + desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk); + hostComponentStateDAO.refresh(stateEntity); + hostComponentDesiredStateDAO.refresh(desiredStateEntity); + } + } finally { + writeLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Transactional @@ -1263,42 +1467,59 @@ public class ServiceComponentHostImpl im } @Override - public synchronized boolean canBeRemoved() { + public boolean canBeRemoved() { + clusterGlobalLock.readLock().lock(); try { readLock.lock(); + try { - return (getDesiredState().isRemovableState() && - getState().isRemovableState()); + return (getDesiredState().isRemovableState() && + getState().isRemovableState()); + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void deleteDesiredConfigs(Set<String> configTypes) { + clusterGlobalLock.readLock().lock(); try { writeLock.lock(); - hostComponentDesiredConfigMappingDAO.removeByType(configTypes); - for (String configType : configTypes) { - desiredConfigs.remove(configType); + try { + hostComponentDesiredConfigMappingDAO.removeByType(configTypes); + for (String configType : configTypes) { + desiredConfigs.remove(configType); + } + } finally { + writeLock.unlock(); } } finally { - writeLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void delete() { + clusterGlobalLock.writeLock().lock(); try { writeLock.lock(); - if (persisted) { - removeEntities(); - persisted = false; + try { + if (persisted) { + removeEntities(); + persisted = false; + } + desiredConfigs.clear(); + } finally { + writeLock.unlock(); } - desiredConfigs.clear(); } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } } @@ -1324,35 +1545,68 @@ public class ServiceComponentHostImpl im @Override public void updateActualConfigs(Map<String, Map<String, String>> configTags) { - actualConfigs = new HashMap<String, DesiredConfig>(); - - String hostName = getHostName(); - - for (Entry<String, Map<String,String>> entry : configTags.entrySet()) { - String type = entry.getKey(); - Map<String, String> values = entry.getValue(); - - String tag = values.get("tag"); - String hostTag = values.get("host_override_tag"); - - DesiredConfig dc = new DesiredConfig(); - dc.setVersion(tag); - actualConfigs.put(type, dc); - if (null != hostTag && null != hostName) { - List<HostOverride> list = new ArrayList<HostOverride>(); - list.add (new HostOverride(hostName, hostTag)); - dc.setHostOverrides(list); + clusterGlobalLock.readLock().lock(); + try { + writeLock.lock(); + try { + actualConfigs = new HashMap<String, DesiredConfig>(); + + String hostName = getHostName(); + + for (Entry<String, Map<String, String>> entry : configTags.entrySet()) { + String type = entry.getKey(); + Map<String, String> values = entry.getValue(); + + String tag = values.get("tag"); + String hostTag = values.get("host_override_tag"); + + DesiredConfig dc = new DesiredConfig(); + dc.setVersion(tag); + actualConfigs.put(type, dc); + if (null != hostTag && null != hostName) { + List<HostOverride> list = new ArrayList<HostOverride>(); + list.add(new HostOverride(hostName, hostTag)); + dc.setHostOverrides(list); + } + } + } finally { + writeLock.unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } + } @Override public Map<String, DesiredConfig> getActualConfigs() { - return actualConfigs; + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return actualConfigs; + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + } @Override public HostState getHostState() { - return host.getState(); + clusterGlobalLock.readLock().lock(); + try { + readLock.lock(); + try { + return host.getState(); + } finally { + readLock.unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + } }
