AMBARI-22604 Fix hostConfigMappingEntity cluster_id, concurrency in ClusterImpl and serviceConfigTypes initialization (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/490808cb Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/490808cb Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/490808cb Branch: refs/heads/branch-feature-AMBARI-14714-blueprintv2 Commit: 490808cb909a1d25eab347aebbd70a054b67197e Parents: 10a19e5 Author: Dmytro Sen <d...@apache.org> Authored: Fri Dec 8 14:41:19 2017 +0200 Committer: Dmytro Sen <d...@apache.org> Committed: Fri Dec 8 14:41:19 2017 +0200 ---------------------------------------------------------------------- .../server/orm/entities/ConfigGroupEntity.java | 8 +- .../orm/entities/HostConfigMappingEntity.java | 12 +- .../orm/entities/ServiceConfigEntity.java | 8 +- .../server/state/cluster/ClusterImpl.java | 182 +++++++++++-------- 4 files changed, 116 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java index 225b2cc..e8b2702 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java @@ -74,18 +74,18 @@ public class ConfigGroupEntity { @Column(name = "create_timestamp", nullable=false, insertable=true, updatable=false) private long timestamp; - @Column(name = "service_id", nullable = false, insertable = false, updatable = false) + @Column(name = "service_id") private Long serviceId; - @Column(name = "service_group_id", nullable = false, insertable = false, updatable = false) + @Column(name = "service_group_id") private Long serviceGroupId; @ManyToOne @JoinColumns( { @JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id", nullable = false, insertable = false, updatable = false), - @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", nullable = false), - @JoinColumn(name = "service_id", referencedColumnName = "id", nullable = false) + @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", insertable = false, updatable = false, nullable = false), + @JoinColumn(name = "service_id", referencedColumnName = "id", insertable = false, updatable = false, nullable = false) }) private ClusterServiceEntity clusterServiceEntity; http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java index 76708f9..75ed332 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java @@ -43,7 +43,7 @@ import javax.persistence.Table; public class HostConfigMappingEntity { @Id - @Column(name = "cluster_id", insertable = false, updatable = false, nullable = false) + @Column(name = "cluster_id", nullable = false) private Long clusterId; @Id @@ -61,18 +61,18 @@ public class HostConfigMappingEntity { @Column(name = "version_tag", insertable = true, updatable = false, nullable = false) private String versionTag; - @Column(name = "service_id", insertable = false, updatable = false, nullable = false) + @Column(name = "service_id") private Long serviceId; - @Column(name = "service_group_id", insertable = false, updatable = false, nullable = false) + @Column(name = "service_group_id") private Long serviceGroupId; @ManyToOne @JoinColumns( { - @JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id", nullable = false), - @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", nullable = false), - @JoinColumn(name = "service_id", referencedColumnName = "id", nullable = false) + @JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id", insertable = false, updatable = false, nullable = false), + @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", insertable = false, updatable = false, nullable = false), + @JoinColumn(name = "service_id", referencedColumnName = "id", insertable = false, updatable = false, nullable = false) }) private ClusterServiceEntity clusterServiceEntity; http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java index 7cd7e98..030706e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java @@ -76,19 +76,19 @@ public class ServiceConfigEntity { private Long clusterId; @Basic - @Column(name = "service_id", nullable = false, insertable = false, updatable = false) + @Column(name = "service_id", nullable = false) private Long serviceId; @Basic - @Column(name = "service_group_id", nullable = false, insertable = false, updatable = false) + @Column(name = "service_group_id", nullable = false) private Long serviceGroupId; @ManyToOne @JoinColumns( { @JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id", nullable = false, insertable = false, updatable = false), - @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", nullable = false), - @JoinColumn(name = "service_id", referencedColumnName = "id", nullable = false) + @JoinColumn(name = "service_group_id", referencedColumnName = "service_group_id", insertable = false, updatable = false, nullable = false), + @JoinColumn(name = "service_id", referencedColumnName = "id", insertable = false, updatable = false, nullable = false) }) private ClusterServiceEntity clusterServiceEntity; http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 0b38b36..3b0ebae 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.stream.Collectors; @@ -154,12 +155,10 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Functions; import com.google.common.base.Predicate; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; import com.google.inject.Injector; @@ -198,7 +197,7 @@ public class ClusterImpl implements Cluster { private final ConcurrentMap<String, ConcurrentMap<String, Config>> allConfigs = new ConcurrentHashMap<>(); /** - * [ Service -> [ allConfigs ] ] + * [ Service ID -> [ Config Type -> [ Config Version Tag -> Config ] ] ] */ private final ConcurrentMap<Long, ConcurrentMap<String, ConcurrentMap<String, Config>>> serviceConfigs = new ConcurrentHashMap<>(); @@ -213,6 +212,11 @@ public class ClusterImpl implements Cluster { private final ConcurrentMap<String, List<ServiceComponentHost>> serviceComponentHostsByHost = new ConcurrentHashMap<>(); /** + * [ Service ID -> [ set of config types ] ] + */ + private final ConcurrentMap<Long, ConcurrentSkipListSet<String>> serviceConfigTypes = new ConcurrentHashMap<>(); + + /** * Map of existing config groups */ private final Map<Long, ConfigGroup> clusterConfigGroups = new ConcurrentHashMap<>(); @@ -309,8 +313,6 @@ public class ClusterImpl implements Cluster { @Inject private StackDAO stackDAO; - private volatile Multimap<Long, String> serviceConfigTypes; - /** * Used to publish events relating to cluster CRUD operations and to receive * information about cluster operations. @@ -385,18 +387,18 @@ public class ClusterImpl implements Cluster { } private void loadServiceConfigTypes(Service service) throws AmbariException { - if (serviceConfigTypes == null) { - serviceConfigTypes = HashMultimap.create(); - } else { - serviceConfigTypes.removeAll(service.getServiceId()); - } + clusterGlobalLock.readLock().lock(); try { - serviceConfigTypes.putAll(service.getServiceId(), collectServiceConfigTypes(service)); - } catch (AmbariException e) { - LOG.error("Cannot load service info:", e); - throw e; + try { + serviceConfigTypes.put(service.getServiceId(), new ConcurrentSkipListSet<>(collectServiceConfigTypes(service))); + } catch (AmbariException e) { + LOG.error("Cannot load service info:", e); + throw e; + } + LOG.info("Updated service config types: {}", serviceConfigTypes); + } finally { + clusterGlobalLock.readLock().unlock(); } - LOG.info("Updated service config types: {}", serviceConfigTypes); } /** @@ -903,18 +905,23 @@ public class ClusterImpl implements Cluster { @Override public void addService(Service service) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding a new Service, clusterName={}, clusterId={}, serviceName={} serviceType={}", - getClusterName(), getClusterId(), service.getName(), service.getServiceType()); - } - //TODO get rid of services map - services.put(service.getName(), service); - - servicesById.put(service.getServiceId(), service); + clusterGlobalLock.writeLock().lock(); try { - loadServiceConfigTypes(service); - } catch (AmbariException e) { - e.printStackTrace(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding a new Service, clusterName={}, clusterId={}, serviceName={} serviceType={}", + getClusterName(), getClusterId(), service.getName(), service.getServiceType()); + } + //TODO get rid of services map + services.put(service.getName(), service); + + servicesById.put(service.getServiceId(), service); + try { + loadServiceConfigTypes(service); + } catch (AmbariException e) { + e.printStackTrace(); + } + } finally { + clusterGlobalLock.writeLock().unlock(); } } @@ -1094,26 +1101,31 @@ public class ClusterImpl implements Cluster { @Override public Service getService(String serviceGroupName, String serviceName) throws AmbariException { + clusterGlobalLock.readLock().lock(); Service service = null; - //TODO use serviceIds instead of service name (remove this if) - if (serviceGroupName == null) { - service = getService(serviceName); - if (null == service) { - throw new ServiceNotFoundException(getClusterName(), serviceName); - } - } else { - for (Service serviceCandidate : servicesById.values()) { - if (serviceCandidate.getClusterId().equals(this.getClusterId()) - && StringUtils.equals(serviceCandidate.getServiceGroupName(), serviceGroupName) - && StringUtils.equals(serviceCandidate.getName(), serviceName)) { - if (service == null) { - service = serviceCandidate; - } else { - LOG.error("Two services entities found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate); - throw new AmbariException(String.format("Two services entities found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate)); + try { + //TODO use serviceIds instead of service name (remove this if) + if (serviceGroupName == null) { + service = getService(serviceName); + if (null == service) { + throw new ServiceNotFoundException(getClusterName(), serviceName); + } + } else { + for (Service serviceCandidate : servicesById.values()) { + if (serviceCandidate.getClusterId().equals(this.getClusterId()) + && StringUtils.equals(serviceCandidate.getServiceGroupName(), serviceGroupName) + && StringUtils.equals(serviceCandidate.getName(), serviceName)) { + if (service == null) { + service = serviceCandidate; + } else { + LOG.error("Two services entities found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate); + throw new AmbariException(String.format("Two services entities found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate)); + } } } } + } finally { + clusterGlobalLock.readLock().unlock(); } if (null == service) { @@ -1125,9 +1137,15 @@ public class ClusterImpl implements Cluster { @Override public Service getService(Long serviceId) throws AmbariException { - Service service = servicesById.get(serviceId); - if (null == service) { - throw new ServiceNotFoundException(getClusterName(), serviceId); + clusterGlobalLock.readLock().lock(); + Service service = null; + try { + service = servicesById.get(serviceId); + if (null == service) { + throw new ServiceNotFoundException(getClusterName(), serviceId); + } + } finally { + clusterGlobalLock.readLock().unlock(); } return service; } @@ -1794,7 +1812,7 @@ public class ClusterImpl implements Cluster { serviceComponentHosts.remove(serviceName); services.remove(serviceName); servicesById.remove(service.getServiceId()); - serviceConfigTypes.removeAll(service.getServiceId()); + serviceConfigTypes.remove(service.getServiceId()); for (List<ServiceComponentHost> serviceComponents : serviceComponentHostsByHost.values()) { Iterables.removeIf(serviceComponents, new Predicate<ServiceComponentHost>() { @@ -1876,11 +1894,13 @@ public class ClusterImpl implements Cluster { clusterDAO.removeByPK(clusterId); } + //TODO this needs to be reworked to support multiple instance of same service @Override public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs) throws AmbariException { return addDesiredConfig(user, configs, null); } + //TODO this needs to be reworked to support multiple instance of same service @Override public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs, String serviceConfigVersionNote) throws AmbariException { if (null == user) { @@ -1923,12 +1943,14 @@ public class ClusterImpl implements Cluster { * Gets all versions of the desired configurations for the cluster. * @return a map of type-to-configuration information. */ + //TODO this needs to be reworked to support multiple instance of same service @Override public Map<String, Set<DesiredConfig>> getAllDesiredConfigVersions() { return getDesiredConfigs(true); } + //TODO this needs to be reworked to support multiple instance of same service @Override public Map<String, DesiredConfig> getDesiredConfigs() { Map<String, Set<DesiredConfig>> activeConfigsByType = getDesiredConfigs(false); @@ -1949,6 +1971,7 @@ public class ClusterImpl implements Cluster { * desired configuration per config type. * @return a map of type-to-configuration information. */ + //TODO this needs to be reworked to support multiple instance of same service private Map<String, Set<DesiredConfig>> getDesiredConfigs(boolean allVersions) { clusterGlobalLock.readLock().lock(); try { @@ -2091,44 +2114,40 @@ public class ClusterImpl implements Cluster { return response; } + //TODO this needs to be reworked to support multiple instance of same service @Override public Long getServiceForConfigTypes(Collection<String> configTypes) { //debug LOG.info("Looking for service for config types {}", configTypes); - Long serviceId = null; - for (String configType : configTypes) { - for (Entry<Long, String> entry : serviceConfigTypes.entries()) { - if (StringUtils.equals(entry.getValue(), configType)) { - if (serviceId != null) { - if (entry.getKey() != null && !serviceId.equals(entry.getKey())) { - throw new IllegalArgumentException(String.format("Config type %s belongs to %s service, " + - "but also qualified for %s", configType, getServiceOrNull(serviceId), getServiceOrNull(entry.getKey()))); - } - } else { - serviceId = entry.getKey(); - } - } + List<Long> resultingServiceIds = new ArrayList<>(); + + for (Long serviceId : serviceConfigTypes.keySet()) { + Set<String> configTypesForServiceId = serviceConfigTypes.get(serviceId); + if (configTypesForServiceId.containsAll(configTypes) && configTypesForServiceId.size() == configTypes.size()) { + resultingServiceIds.add(serviceId); } } - if (serviceId == null) { - LOG.warn("Can't find serviceId for {}, there is a problem if there's no cluster-env", configTypes); + if (resultingServiceIds.isEmpty()) { + LOG.warn("Can't find serviceIds for {}, there is a problem if there's no cluster-env", configTypes); } else { - LOG.info("Service {} returning", getServiceOrNull(serviceId)); + LOG.info("Service {} returning", getServiceOrNull(resultingServiceIds.get(0))); } - return serviceId; + //TODO this needs to be reworked to support multiple instance of same service + return resultingServiceIds.get(0); } + //TODO this needs to be reworked to support multiple instance of same service @Override public Service getServiceByConfigType(String configType) { - for (Entry<Long, String> entry : serviceConfigTypes.entries()) { - Long serviceId = entry.getKey(); - String type = entry.getValue(); - if (StringUtils.equals(type, configType)) { - return getServiceOrNull(serviceId); + List<Long> resultingServiceIds = new ArrayList<>(); + for (Long serviceId : serviceConfigTypes.keySet()) { + if (serviceConfigTypes.get(serviceId).contains(configType)) { + resultingServiceIds.add(serviceId); } } - return null; + //TODO this needs to be reworked to support multiple instance of same service + return getServiceOrNull(resultingServiceIds.get(0)); } @Override @@ -2399,22 +2418,22 @@ public class ClusterImpl implements Cluster { return convertToServiceConfigVersionResponse(serviceConfigEntityClone); } + //TODO this needs to be reworked to support multiple instance of same service @Transactional ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, String serviceConfigVersionNote) throws AmbariException { - - Long serviceId = null; + Long resultingServiceId = null; for (Config config : configs) { - for (Entry<Long, String> entry : serviceConfigTypes.entries()) { - if (StringUtils.equals(entry.getValue(), config.getType())) { - if (serviceId == null) { - serviceId = entry.getKey(); + for (Long serviceId : serviceConfigTypes.keySet()) { + if (serviceConfigTypes.get(serviceId).contains(config.getType())) { + if (resultingServiceId == null) { + resultingServiceId = serviceId; break; - } else if (!serviceId.equals(entry.getKey())) { + } else if (!resultingServiceId.equals(serviceId)) { String error = String.format("Updating configs for multiple services by a " + "single API request isn't supported. Conflicting services %s and %s for %s", - getServiceOrNull(serviceId).getName(), getServiceOrNull(entry.getKey()).getName(), config.getType()); + getService(serviceId).getName(), getService(resultingServiceId).getName(), config.getType()); IllegalArgumentException exception = new IllegalArgumentException(error); - LOG.error(error + ", config version not created for {}", getServiceOrNull(serviceId).getName()); + LOG.error(error + ", config version not created for {}", getService(resultingServiceId).getName()); throw exception; } else { break; @@ -2442,7 +2461,7 @@ public class ClusterImpl implements Cluster { clusterEntity = clusterDAO.merge(clusterEntity); - if (serviceId == null) { + if (resultingServiceId == null) { ArrayList<String> configTypes = new ArrayList<>(); for (Config config : configs) { configTypes.add(config.getType()); @@ -2450,7 +2469,7 @@ public class ClusterImpl implements Cluster { LOG.error("No service found for config types '{}', service config version not created", configTypes); return null; } else { - return createServiceConfigVersion(serviceId, user, serviceConfigVersionNote); + return createServiceConfigVersion(resultingServiceId, user, serviceConfigVersionNote); } } @@ -2466,6 +2485,7 @@ public class ClusterImpl implements Cluster { return clusterDAO.getEnabledConfigsByTypes(getClusterId(), new ArrayList<>(configTypes)); } + //TODO this needs to be reworked to support multiple instance of same service @Override public Config getDesiredConfigByType(String configType) { ClusterConfigEntity config = clusterDAO.findEnabledConfigByType(getClusterId(), configType); @@ -2482,6 +2502,7 @@ public class ClusterImpl implements Cluster { return null != config; } + //TODO this needs to be reworked to support multiple instance of same service @Override public Map<Long, Map<String, DesiredConfig>> getHostsDesiredConfigs(Collection<Long> hostIds) { @@ -2510,6 +2531,7 @@ public class ClusterImpl implements Cluster { return desiredConfigsByHost; } + //TODO this needs to be reworked to support multiple instance of same service @Override public Map<Long, Map<String, DesiredConfig>> getAllHostsDesiredConfigs() {