Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java?rev=1489550&r1=1489549&r2=1489550&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java (original) +++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java Tue Jun 4 18:33:30 2013 @@ -76,7 +76,7 @@ public class ClusterImpl implements Clus private StackId desiredStackVersion; - private Map<String, Service> services = null; + private volatile Map<String, Service> services = null; /** * [ Config Type -> [ Config Version Tag -> Config ] ] @@ -99,6 +99,8 @@ public class ClusterImpl implements Clus private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); + private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock(); + private ClusterEntity clusterEntity; @Inject @@ -143,6 +145,11 @@ public class ClusterImpl implements Clus } } + @Override + public ReadWriteLock getClusterGlobalLock() { + return clusterGlobalLock; + } + /** * Make sure we load all the service host components. @@ -151,419 +158,521 @@ public class ClusterImpl implements Clus public void loadServiceHostComponents() { loadServices(); if (svcHostsLoaded) return; - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - LOG.info("Loading Service Host Components"); - if (svcHostsLoaded) return; - if (services != null) { - for (Entry<String, Service> serviceKV: services.entrySet()) { + writeLock.lock(); + try { + LOG.info("Loading Service Host Components"); + if (svcHostsLoaded) return; + if (services != null) { + for (Entry<String, Service> serviceKV : services.entrySet()) { /* get all the service component hosts **/ - Service service = serviceKV.getValue(); - if (!serviceComponentHosts.containsKey(service.getName())) { - serviceComponentHosts.put(service.getName(), new HashMap<String, - Map<String, ServiceComponentHost>>()); - } - for (Entry<String, ServiceComponent> svcComponent: - service.getServiceComponents().entrySet()) { - ServiceComponent comp = svcComponent.getValue(); - String componentName = svcComponent.getKey(); - if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) { - serviceComponentHosts.get(service.getName()).put(componentName, - new HashMap<String, ServiceComponentHost>()); + Service service = serviceKV.getValue(); + if (!serviceComponentHosts.containsKey(service.getName())) { + serviceComponentHosts.put(service.getName(), new HashMap<String, + Map<String, ServiceComponentHost>>()); } - /** Get Service Host Components **/ - for (Entry<String, ServiceComponentHost> svchost: - comp.getServiceComponentHosts().entrySet()) { + for (Entry<String, ServiceComponent> svcComponent : + service.getServiceComponents().entrySet()) { + ServiceComponent comp = svcComponent.getValue(); + String componentName = svcComponent.getKey(); + if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) { + serviceComponentHosts.get(service.getName()).put(componentName, + new HashMap<String, ServiceComponentHost>()); + } + /** Get Service Host Components **/ + for (Entry<String, ServiceComponentHost> svchost : + comp.getServiceComponentHosts().entrySet()) { String hostname = svchost.getKey(); ServiceComponentHost svcHostComponent = svchost.getValue(); if (!serviceComponentHostsByHost.containsKey(hostname)) { serviceComponentHostsByHost.put(hostname, new ArrayList<ServiceComponentHost>()); } - List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname); + List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname); compList.add(svcHostComponent); if (!serviceComponentHosts.get(service.getName()).get(componentName) .containsKey(hostname)) { serviceComponentHosts.get(service.getName()).get(componentName) - .put(hostname, svcHostComponent); + .put(hostname, svcHostComponent); } + } } } } + svcHostsLoaded = true; + } finally { + writeLock.unlock(); } - svcHostsLoaded = true; } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } private void loadServices() { //logging here takes too much time // LOG.info("clusterEntity " + clusterEntity.getClusterServiceEntities() ); if (services == null) { - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - if (services == null) { - services = new TreeMap<String, Service>(); - if (!clusterEntity.getClusterServiceEntities().isEmpty()) { - for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) { - services.put(serviceEntity.getServiceName(), serviceFactory.createExisting(this, serviceEntity)); + writeLock.lock(); + try { + if (services == null) { + services = new TreeMap<String, Service>(); + if (!clusterEntity.getClusterServiceEntities().isEmpty()) { + for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) { + services.put(serviceEntity.getServiceName(), serviceFactory.createExisting(this, serviceEntity)); + } } } + } finally { + writeLock.unlock(); } } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } } public ServiceComponentHost getServiceComponentHost(String serviceName, String serviceComponentName, String hostname) throws AmbariException { loadServiceHostComponents(); - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - if (!serviceComponentHosts.containsKey(serviceName) - || !serviceComponentHosts.get(serviceName) - .containsKey(serviceComponentName) - || !serviceComponentHosts.get(serviceName).get(serviceComponentName) - .containsKey(hostname)) { - throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName, - serviceComponentName, hostname); + readLock.lock(); + try { + if (!serviceComponentHosts.containsKey(serviceName) + || !serviceComponentHosts.get(serviceName) + .containsKey(serviceComponentName) + || !serviceComponentHosts.get(serviceName).get(serviceComponentName) + .containsKey(hostname)) { + throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName, + serviceComponentName, hostname); + } + return serviceComponentHosts.get(serviceName).get(serviceComponentName) + .get(hostname); + } finally { + readLock.unlock(); } - return serviceComponentHosts.get(serviceName).get(serviceComponentName) - .get(hostname); } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public String getClusterName() { - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - return clusterEntity.getClusterName(); + readLock.lock(); + try { + return clusterEntity.getClusterName(); + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void setClusterName(String clusterName) { - writeLock.lock(); + clusterGlobalLock.readLock().lock(); try { - String oldName = clusterEntity.getClusterName(); - clusterEntity.setClusterName(clusterName); - clusterDAO.merge(clusterEntity); //RollbackException possibility if UNIQUE constraint violated - clusters.updateClusterName(oldName, clusterName); + writeLock.lock(); + try { + String oldName = clusterEntity.getClusterName(); + clusterEntity.setClusterName(clusterName); + clusterDAO.merge(clusterEntity); //RollbackException possibility if UNIQUE constraint violated + clusters.updateClusterName(oldName, clusterName); + } finally { + writeLock.unlock(); + } } finally { - writeLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } public void addServiceComponentHost( ServiceComponentHost svcCompHost) throws AmbariException { loadServiceHostComponents(); - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache" - + ", serviceName=" + svcCompHost.getServiceName() - + ", componentName=" + svcCompHost.getServiceComponentName() - + ", hostname=" + svcCompHost.getHostName()); - } - - final String hostname = svcCompHost.getHostName(); - final String serviceName = svcCompHost.getServiceName(); - final String componentName = svcCompHost.getServiceComponentName(); - Set<Cluster> cs = clusters.getClustersForHost(hostname); - boolean clusterFound = false; - Iterator<Cluster> iter = cs.iterator(); - while (iter.hasNext()) { - Cluster c = iter.next(); - if (c.getClusterId() == this.getClusterId()) { - clusterFound = true; - break; - } - } - if (!clusterFound) { - throw new AmbariException("Host does not belong this cluster" - + ", hostname=" + hostname - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId()); - } + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache" + + ", serviceName=" + svcCompHost.getServiceName() + + ", componentName=" + svcCompHost.getServiceComponentName() + + ", hostname=" + svcCompHost.getHostName()); + } + + final String hostname = svcCompHost.getHostName(); + final String serviceName = svcCompHost.getServiceName(); + final String componentName = svcCompHost.getServiceComponentName(); + Set<Cluster> cs = clusters.getClustersForHost(hostname); + boolean clusterFound = false; + Iterator<Cluster> iter = cs.iterator(); + while (iter.hasNext()) { + Cluster c = iter.next(); + if (c.getClusterId() == this.getClusterId()) { + clusterFound = true; + break; + } + } + if (!clusterFound) { + throw new AmbariException("Host does not belong this cluster" + + ", hostname=" + hostname + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId()); + } - if (!serviceComponentHosts.containsKey(serviceName)) { - serviceComponentHosts.put(serviceName, - new HashMap<String, Map<String, ServiceComponentHost>>()); - } - if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) { - serviceComponentHosts.get(serviceName).put(componentName, - new HashMap<String, ServiceComponentHost>()); - } + if (!serviceComponentHosts.containsKey(serviceName)) { + serviceComponentHosts.put(serviceName, + new HashMap<String, Map<String, ServiceComponentHost>>()); + } + if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) { + serviceComponentHosts.get(serviceName).put(componentName, + new HashMap<String, ServiceComponentHost>()); + } - if (serviceComponentHosts.get(serviceName).get(componentName). - containsKey(hostname)) { - throw new AmbariException("Duplicate entry for ServiceComponentHost" - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (serviceComponentHosts.get(serviceName).get(componentName). + containsKey(hostname)) { + throw new AmbariException("Duplicate entry for ServiceComponentHost" + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - if (!serviceComponentHostsByHost.containsKey(hostname)) { - serviceComponentHostsByHost.put(hostname, - new ArrayList<ServiceComponentHost>()); - } + if (!serviceComponentHostsByHost.containsKey(hostname)) { + serviceComponentHostsByHost.put(hostname, + new ArrayList<ServiceComponentHost>()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new ServiceComponentHost" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new ServiceComponentHost" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - serviceComponentHosts.get(serviceName).get(componentName).put(hostname, - svcCompHost); - serviceComponentHostsByHost.get(hostname).add(svcCompHost); + serviceComponentHosts.get(serviceName).get(componentName).put(hostname, + svcCompHost); + serviceComponentHostsByHost.get(hostname).add(svcCompHost); + } finally { + writeLock.unlock(); + } } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } public void removeServiceComponentHost(ServiceComponentHost svcCompHost) throws AmbariException { loadServiceHostComponents(); - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache" - + ", serviceName=" + svcCompHost.getServiceName() - + ", componentName=" + svcCompHost.getServiceComponentName() - + ", hostname=" + svcCompHost.getHostName()); - } - - final String hostname = svcCompHost.getHostName(); - final String serviceName = svcCompHost.getServiceName(); - final String componentName = svcCompHost.getServiceComponentName(); - Set<Cluster> cs = clusters.getClustersForHost(hostname); - boolean clusterFound = false; - Iterator<Cluster> iter = cs.iterator(); - while (iter.hasNext()) { - Cluster c = iter.next(); - if (c.getClusterId() == this.getClusterId()) { - clusterFound = true; - break; - } - } - if (!clusterFound) { - throw new AmbariException("Host does not belong this cluster" - + ", hostname=" + hostname - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId()); - } - - if (!serviceComponentHosts.containsKey(serviceName) - || !serviceComponentHosts.get(serviceName).containsKey(componentName) - || !serviceComponentHosts.get(serviceName).get(componentName). - containsKey(hostname)) { - throw new AmbariException("Invalid entry for ServiceComponentHost" - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } - if (!serviceComponentHostsByHost.containsKey(hostname)) { - throw new AmbariException("Invalid host entry for ServiceComponentHost" - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache" + + ", serviceName=" + svcCompHost.getServiceName() + + ", componentName=" + svcCompHost.getServiceComponentName() + + ", hostname=" + svcCompHost.getHostName()); + } + + final String hostname = svcCompHost.getHostName(); + final String serviceName = svcCompHost.getServiceName(); + final String componentName = svcCompHost.getServiceComponentName(); + Set<Cluster> cs = clusters.getClustersForHost(hostname); + boolean clusterFound = false; + Iterator<Cluster> iter = cs.iterator(); + while (iter.hasNext()) { + Cluster c = iter.next(); + if (c.getClusterId() == this.getClusterId()) { + clusterFound = true; + break; + } + } + if (!clusterFound) { + throw new AmbariException("Host does not belong this cluster" + + ", hostname=" + hostname + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId()); + } - ServiceComponentHost schToRemove = null; - for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) { - if (sch.getServiceName().equals(serviceName) - && sch.getServiceComponentName().equals(componentName) - && sch.getHostName().equals(hostname)) { - schToRemove = sch; - break; + if (!serviceComponentHosts.containsKey(serviceName) + || !serviceComponentHosts.get(serviceName).containsKey(componentName) + || !serviceComponentHosts.get(serviceName).get(componentName). + containsKey(hostname)) { + throw new AmbariException("Invalid entry for ServiceComponentHost" + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } + if (!serviceComponentHostsByHost.containsKey(hostname)) { + throw new AmbariException("Invalid host entry for ServiceComponentHost" + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } + + ServiceComponentHost schToRemove = null; + for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) { + if (sch.getServiceName().equals(serviceName) + && sch.getServiceComponentName().equals(componentName) + && sch.getHostName().equals(hostname)) { + schToRemove = sch; + break; + } } - } - if (schToRemove == null) { - LOG.warn("Unavailable in per host cache. ServiceComponentHost" - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (schToRemove == null) { + LOG.warn("Unavailable in per host cache. ServiceComponentHost" + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Removing a ServiceComponentHost" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName - + ", serviceComponentName" + componentName - + ", hostname= " + hostname); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Removing a ServiceComponentHost" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName + + ", serviceComponentName" + componentName + + ", hostname= " + hostname); + } - serviceComponentHosts.get(serviceName).get(componentName).remove(hostname); - if(schToRemove != null) { - serviceComponentHostsByHost.get(hostname).remove(schToRemove); + serviceComponentHosts.get(serviceName).get(componentName).remove(hostname); + if (schToRemove != null) { + serviceComponentHostsByHost.get(hostname).remove(schToRemove); + } + } finally { + writeLock.unlock(); } } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Override public long getClusterId() { - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - return clusterEntity.getClusterId(); + readLock.lock(); + try { + return clusterEntity.getClusterId(); + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public List<ServiceComponentHost> getServiceComponentHosts( String hostname) { loadServiceHostComponents(); - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - if (serviceComponentHostsByHost.containsKey(hostname)) { - return Collections.unmodifiableList( - serviceComponentHostsByHost.get(hostname)); + readLock.lock(); + try { + if (serviceComponentHostsByHost.containsKey(hostname)) { + return Collections.unmodifiableList( + serviceComponentHostsByHost.get(hostname)); + } + return new ArrayList<ServiceComponentHost>(); + } finally { + readLock.unlock(); } - return new ArrayList<ServiceComponentHost>(); } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void addService(Service service) throws AmbariException { loadServices(); - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new Service" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + service.getName()); - } - if (services.containsKey(service.getName())) { - throw new AmbariException("Service already exists" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + service.getName()); + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new Service" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + service.getName()); + } + if (services.containsKey(service.getName())) { + throw new AmbariException("Service already exists" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + service.getName()); + } + this.services.put(service.getName(), service); + } finally { + writeLock.unlock(); } - this.services.put(service.getName(), service); } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Override public Service addService(String serviceName) throws AmbariException{ loadServices(); - writeLock.lock(); + clusterGlobalLock.writeLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new Service" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName); - } - if (services.containsKey(serviceName)) { - throw new AmbariException("Service already exists" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", serviceName=" + serviceName); + writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new Service" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName); + } + if (services.containsKey(serviceName)) { + throw new AmbariException("Service already exists" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", serviceName=" + serviceName); + } + Service s = serviceFactory.createNew(this, serviceName); + this.services.put(s.getName(), s); + return s; + } finally { + writeLock.unlock(); } - Service s = serviceFactory.createNew(this, serviceName); - this.services.put(s.getName(), s); - return s; } finally { - writeLock.unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Override public Service getService(String serviceName) throws AmbariException { loadServices(); - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - if (!services.containsKey(serviceName)) { - throw new ServiceNotFoundException(getClusterName(), serviceName); + readLock.lock(); + try { + if (!services.containsKey(serviceName)) { + throw new ServiceNotFoundException(getClusterName(), serviceName); + } + return services.get(serviceName); + } finally { + readLock.unlock(); } - return services.get(serviceName); } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public Map<String, Service> getServices() { loadServices(); - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - return Collections.unmodifiableMap(services); + readLock.lock(); + try { + return Collections.unmodifiableMap(services); + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public StackId getDesiredStackVersion() { - readLock.lock(); + clusterGlobalLock.readLock().lock(); try { - return desiredStackVersion; + readLock.lock(); + try { + return desiredStackVersion; + } finally { + readLock.unlock(); + } } finally { - readLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void setDesiredStackVersion(StackId stackVersion) { - readWriteLock.writeLock().lock(); + clusterGlobalLock.readLock().lock(); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Changing DesiredStackVersion of Cluster" - + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", currentDesiredStackVersion=" + this.desiredStackVersion - + ", newDesiredStackVersion=" + stackVersion); + readWriteLock.writeLock().lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Changing DesiredStackVersion of Cluster" + + ", clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", currentDesiredStackVersion=" + this.desiredStackVersion + + ", newDesiredStackVersion=" + stackVersion); + } + this.desiredStackVersion = stackVersion; + clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion)); + clusterDAO.merge(clusterEntity); + } finally { + readWriteLock.writeLock().unlock(); } - this.desiredStackVersion = stackVersion; - clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion)); - clusterDAO.merge(clusterEntity); } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public StackId getCurrentStackVersion() { - ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity(); - if(clusterStateEntity != null) - { - String stackVersion = clusterStateEntity.getCurrentStackVersion(); - if(stackVersion != null && !stackVersion.isEmpty()) - { - return gson.fromJson(stackVersion, StackId.class); + clusterGlobalLock.readLock().lock(); + try { + readWriteLock.readLock().lock(); + try { + ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity(); + if (clusterStateEntity != null) { + String stackVersion = clusterStateEntity.getCurrentStackVersion(); + if (stackVersion != null && !stackVersion.isEmpty()) { + return gson.fromJson(stackVersion, StackId.class); + } + } + return null; + } finally { + readWriteLock.readLock().unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - return null; + + } @Override public void setCurrentStackVersion(StackId stackVersion) throws AmbariException { - writeLock.lock(); + clusterGlobalLock.readLock().lock(); try { + writeLock.lock(); + try { ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId()); if (clusterStateEntity == null) { clusterStateEntity = new ClusterStateEntity(); @@ -579,214 +688,282 @@ public class ClusterImpl implements Clus clusterStateDAO.merge(clusterStateEntity); clusterEntity = clusterDAO.merge(clusterEntity); } - } catch (RollbackException e) { - LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName()); - throw new AmbariException("Unable to set" - + " version=" + stackVersion - + " for cluster " + getClusterName(), e); + } catch (RollbackException e) { + LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName()); + throw new AmbariException("Unable to set" + + " version=" + stackVersion + + " for cluster " + getClusterName(), e); + } finally { + writeLock.unlock(); + } } finally { - writeLock.unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public Map<String, Config> getConfigsByType(String configType) { - readWriteLock.writeLock().lock(); + clusterGlobalLock.readLock().lock(); try { - if (!allConfigs.containsKey(configType)) - return null; + readWriteLock.writeLock().lock(); + try { + if (!allConfigs.containsKey(configType)) + return null; - return Collections.unmodifiableMap(allConfigs.get(configType)); + return Collections.unmodifiableMap(allConfigs.get(configType)); + } finally { + readWriteLock.writeLock().unlock(); + } } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public Config getConfig(String configType, String versionTag) { - readWriteLock.readLock().lock(); + clusterGlobalLock.readLock().lock(); try { - if (!allConfigs.containsKey(configType) - || !allConfigs.get(configType).containsKey(versionTag)) { - return null; + readWriteLock.readLock().lock(); + try { + if (!allConfigs.containsKey(configType) + || !allConfigs.get(configType).containsKey(versionTag)) { + return null; + } + return allConfigs.get(configType).get(versionTag); + } finally { + readWriteLock.readLock().unlock(); } - return allConfigs.get(configType).get(versionTag); } finally { - readWriteLock.readLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void addConfig(Config config) { - readWriteLock.writeLock().lock(); + clusterGlobalLock.readLock().lock(); try { - if (config.getType() == null - || config.getType().isEmpty() - || config.getVersionTag() == null - || config.getVersionTag().isEmpty()) { - // TODO throw error - } - if (!allConfigs.containsKey(config.getType())) { - allConfigs.put(config.getType(), new HashMap<String, Config>()); - } + readWriteLock.writeLock().lock(); + try { + if (config.getType() == null + || config.getType().isEmpty() + || config.getVersionTag() == null + || config.getVersionTag().isEmpty()) { + // TODO throw error + } + if (!allConfigs.containsKey(config.getType())) { + allConfigs.put(config.getType(), new HashMap<String, Config>()); + } - allConfigs.get(config.getType()).put(config.getVersionTag(), config); + allConfigs.get(config.getType()).put(config.getVersionTag(), config); + } finally { + readWriteLock.writeLock().unlock(); + } } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public Collection<Config> getAllConfigs() { - readWriteLock.readLock().lock(); + clusterGlobalLock.readLock().lock(); try { - List<Config> list = new ArrayList<Config>(); - for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) { - for (Config config : entry.getValue().values()) { - list.add(config); + readWriteLock.readLock().lock(); + try { + List<Config> list = new ArrayList<Config>(); + for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) { + for (Config config : entry.getValue().values()) { + list.add(config); + } } + return Collections.unmodifiableList(list); + } finally { + readWriteLock.readLock().unlock(); } - return Collections.unmodifiableList(list); } finally { - readWriteLock.readLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public ClusterResponse convertToResponse() throws AmbariException { - readWriteLock.readLock().lock(); + clusterGlobalLock.readLock().lock(); try { - ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(), - clusters.getHostsForCluster(getClusterName()).keySet(), - getDesiredStackVersion().getStackId()); + readWriteLock.readLock().lock(); + try { + ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(), + clusters.getHostsForCluster(getClusterName()).keySet(), + getDesiredStackVersion().getStackId()); - return r; + return r; + } finally { + readWriteLock.readLock().unlock(); + } } finally { - readWriteLock.readLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override public void debugDump(StringBuilder sb) { loadServices(); - readWriteLock.readLock().lock(); + clusterGlobalLock.readLock().lock(); try { - sb.append("Cluster={ clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() - + ", desiredStackVersion=" + desiredStackVersion.getStackId() - + ", services=[ "); - boolean first = true; - for (Service s : services.values()) { - if (!first) { - sb.append(" , "); - first = false; - } - sb.append("\n "); - s.debugDump(sb); - sb.append(" "); + readWriteLock.readLock().lock(); + try { + sb.append("Cluster={ clusterName=" + getClusterName() + + ", clusterId=" + getClusterId() + + ", desiredStackVersion=" + desiredStackVersion.getStackId() + + ", services=[ "); + boolean first = true; + for (Service s : services.values()) { + if (!first) { + sb.append(" , "); + first = false; + } + sb.append("\n "); + s.debugDump(sb); + sb.append(" "); + } + sb.append(" ] }"); + } finally { + readWriteLock.readLock().unlock(); } - sb.append(" ] }"); } finally { - readWriteLock.readLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override @Transactional public void refresh() { - readWriteLock.writeLock().lock(); + clusterGlobalLock.readLock().lock(); try { - clusterEntity = clusterDAO.findById(clusterEntity.getClusterId()); - clusterDAO.refresh(clusterEntity); + readWriteLock.writeLock().lock(); + try { + clusterEntity = clusterDAO.findById(clusterEntity.getClusterId()); + clusterDAO.refresh(clusterEntity); + } finally { + readWriteLock.writeLock().unlock(); + } } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.readLock().unlock(); } + } @Override @Transactional public void deleteAllServices() throws AmbariException { loadServices(); - readWriteLock.writeLock().lock(); + clusterGlobalLock.writeLock().lock(); try { - LOG.info("Deleting all services for cluster" - + ", clusterName=" + getClusterName()); - for (Service service : services.values()) { - if (!service.canBeRemoved()) { - throw new AmbariException("Found non removable service when trying to" - + " all services from cluster" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); + readWriteLock.writeLock().lock(); + try { + LOG.info("Deleting all services for cluster" + + ", clusterName=" + getClusterName()); + for (Service service : services.values()) { + if (!service.canBeRemoved()) { + throw new AmbariException("Found non removable service when trying to" + + " all services from cluster" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } } - } - for (Service service : services.values()) { - service.delete(); - } + for (Service service : services.values()) { + service.delete(); + } - services.clear(); + services.clear(); + } finally { + readWriteLock.writeLock().unlock(); + } } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Override public void deleteService(String serviceName) throws AmbariException { loadServices(); - readWriteLock.writeLock().lock(); + clusterGlobalLock.writeLock().lock(); try { - Service service = getService(serviceName); - LOG.info("Deleting service for cluster" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); - // FIXME check dependencies from meta layer - if (!service.canBeRemoved()) { - throw new AmbariException("Could not delete service from cluster" + readWriteLock.writeLock().lock(); + try { + Service service = getService(serviceName); + LOG.info("Deleting service for cluster" + ", clusterName=" + getClusterName() + ", serviceName=" + service.getName()); + // FIXME check dependencies from meta layer + if (!service.canBeRemoved()) { + throw new AmbariException("Could not delete service from cluster" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } + service.delete(); + services.remove(serviceName); + } finally { + readWriteLock.writeLock().unlock(); } - service.delete(); - services.remove(serviceName); } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Override public boolean canBeRemoved() { loadServices(); - readWriteLock.readLock().lock(); + clusterGlobalLock.readLock().lock(); try { - boolean safeToRemove = true; - for (Service service : services.values()) { - if (!service.canBeRemoved()) { - safeToRemove = false; - LOG.warn("Found non removable service" - + ", clusterName=" + getClusterName() - + ", serviceName=" + service.getName()); + readWriteLock.readLock().lock(); + try { + boolean safeToRemove = true; + for (Service service : services.values()) { + if (!service.canBeRemoved()) { + safeToRemove = false; + LOG.warn("Found non removable service" + + ", clusterName=" + getClusterName() + + ", serviceName=" + service.getName()); + } } + return safeToRemove; + } finally { + readWriteLock.readLock().unlock(); } - return safeToRemove; } finally { - readWriteLock.readLock().unlock(); + clusterGlobalLock.readLock().unlock(); } - } @Override @Transactional public void delete() throws AmbariException { - readWriteLock.writeLock().lock(); + clusterGlobalLock.writeLock().lock(); try { - refresh(); - deleteAllServices(); - removeEntities(); - allConfigs.clear(); + readWriteLock.writeLock().lock(); + try { + refresh(); + deleteAllServices(); + removeEntities(); + allConfigs.clear(); + } finally { + readWriteLock.writeLock().unlock(); + } } finally { - readWriteLock.writeLock().unlock(); + clusterGlobalLock.writeLock().unlock(); } + } @Transactional @@ -799,79 +976,111 @@ public class ClusterImpl implements Clus public boolean addDesiredConfig(String user, Config config) { if (null == user) throw new NullPointerException("User must be specified."); - - Config currentDesired = getDesiredConfigByType(config.getType()); - // do not set if it is already the current - if (null != currentDesired && currentDesired.getVersionTag().equals(config.getVersionTag())) { - return false; - } + clusterGlobalLock.readLock().lock(); + try { + readWriteLock.writeLock().lock(); + try { + Config currentDesired = getDesiredConfigByType(config.getType()); + + // do not set if it is already the current + if (null != currentDesired && currentDesired.getVersionTag().equals(config.getVersionTag())) { + return false; + } - Collection<ClusterConfigMappingEntity> entities = clusterEntity.getConfigMappingEntities(); + Collection<ClusterConfigMappingEntity> entities = clusterEntity.getConfigMappingEntities(); - for (ClusterConfigMappingEntity e : entities) { - if (e.isSelected() > 0 && e.getType().equals(config.getType())) { - e.setSelected(0); + for (ClusterConfigMappingEntity e : entities) { + if (e.isSelected() > 0 && e.getType().equals(config.getType())) { + e.setSelected(0); + } + } + + ClusterConfigMappingEntity entity = new ClusterConfigMappingEntity(); + entity.setClusterEntity(clusterEntity); + entity.setClusterId(clusterEntity.getClusterId()); + entity.setCreateTimestamp(Long.valueOf(System.currentTimeMillis())); + entity.setSelected(1); + entity.setUser(user); + entity.setType(config.getType()); + entity.setVersion(config.getVersionTag()); + entities.add(entity); + + clusterDAO.merge(clusterEntity); + + return true; + } finally { + readWriteLock.writeLock().unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - ClusterConfigMappingEntity entity = new ClusterConfigMappingEntity(); - entity.setClusterEntity(clusterEntity); - entity.setClusterId(clusterEntity.getClusterId()); - entity.setCreateTimestamp(Long.valueOf(System.currentTimeMillis())); - entity.setSelected(1); - entity.setUser(user); - entity.setType(config.getType()); - entity.setVersion(config.getVersionTag()); - entities.add(entity); - clusterDAO.merge(clusterEntity); - - return true; } @Override public Map<String, DesiredConfig> getDesiredConfigs() { + clusterGlobalLock.readLock().lock(); + try { + readWriteLock.readLock().lock(); + try { + Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>(); + + for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) { + if (e.isSelected() > 0) { + DesiredConfig c = new DesiredConfig(); + c.setServiceName(null); + c.setVersion(e.getVersion()); + c.setUser(e.getUser()); + + List<HostConfigMappingEntity> hostMappings = + hostConfigMappingDAO.findSelectedHostsByType(clusterEntity.getClusterId(), + e.getType()); + + List<DesiredConfig.HostOverride> hosts = new ArrayList<DesiredConfig.HostOverride>(); + for (HostConfigMappingEntity mappingEntity : hostMappings) { + hosts.add(new DesiredConfig.HostOverride(mappingEntity.getHostName(), + mappingEntity.getVersion())); + } + + c.setHostOverrides(hosts); + + map.put(e.getType(), c); - Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>(); - - for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) { - if (e.isSelected() > 0) { - DesiredConfig c = new DesiredConfig(); - c.setServiceName(null); - c.setVersion(e.getVersion()); - c.setUser(e.getUser()); - - List<HostConfigMappingEntity> hostMappings = - hostConfigMappingDAO.findSelectedHostsByType(clusterEntity.getClusterId().longValue(), - e.getType()); - - List<DesiredConfig.HostOverride> hosts = new ArrayList<DesiredConfig.HostOverride>(); - for (HostConfigMappingEntity mappingEntity : hostMappings) { - hosts.add (new DesiredConfig.HostOverride(mappingEntity.getHostName(), - mappingEntity.getVersion())); } - - c.setHostOverrides(hosts); - - map.put(e.getType(), c); - } + + return map; + } finally { + readWriteLock.readLock().unlock(); + } + } finally { + clusterGlobalLock.readLock().unlock(); } - return map; + } @Override public Config getDesiredConfigByType(String configType) { + clusterGlobalLock.readLock().lock(); + try { + readWriteLock.readLock().lock(); + try { + for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) { + if (e.isSelected() > 0 && e.getType().equals(configType)) { + return getConfig(e.getType(), e.getVersion()); + } + } - for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) { - if (e.isSelected() > 0 && e.getType().equals(configType)) { - return getConfig(e.getType(), e.getVersion()); + return null; + } finally { + readWriteLock.readLock().unlock(); } + } finally { + clusterGlobalLock.readLock().unlock(); } - - return null; } }
