AMBARI-22604 Fix hostConfigMappingEntity cluster_id, concurrency in ClusterImpl 
and serviceConfigTypes initialization (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/490808cb
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/490808cb
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/490808cb

Branch: refs/heads/branch-feature-AMBARI-14714-blueprintv2
Commit: 490808cb909a1d25eab347aebbd70a054b67197e
Parents: 10a19e5
Author: Dmytro Sen <d...@apache.org>
Authored: Fri Dec 8 14:41:19 2017 +0200
Committer: Dmytro Sen <d...@apache.org>
Committed: Fri Dec 8 14:41:19 2017 +0200

----------------------------------------------------------------------
 .../server/orm/entities/ConfigGroupEntity.java  |   8 +-
 .../orm/entities/HostConfigMappingEntity.java   |  12 +-
 .../orm/entities/ServiceConfigEntity.java       |   8 +-
 .../server/state/cluster/ClusterImpl.java       | 182 +++++++++++--------
 4 files changed, 116 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java
index 225b2cc..e8b2702 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ConfigGroupEntity.java
@@ -74,18 +74,18 @@ public class ConfigGroupEntity {
   @Column(name = "create_timestamp", nullable=false, insertable=true, 
updatable=false)
   private long timestamp;
 
-  @Column(name = "service_id", nullable = false, insertable = false, updatable 
= false)
+  @Column(name = "service_id")
   private Long serviceId;
 
-  @Column(name = "service_group_id", nullable = false, insertable = false, 
updatable = false)
+  @Column(name = "service_group_id")
   private Long serviceGroupId;
 
   @ManyToOne
   @JoinColumns(
       {
           @JoinColumn(name = "cluster_id", referencedColumnName = 
"cluster_id", nullable = false, insertable = false, updatable = false),
-          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", nullable = false),
-          @JoinColumn(name = "service_id", referencedColumnName = "id", 
nullable = false)
+          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", insertable = false, updatable = false, nullable = false),
+          @JoinColumn(name = "service_id", referencedColumnName = "id", 
insertable = false, updatable = false, nullable = false)
       })
   private ClusterServiceEntity clusterServiceEntity;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java
index 76708f9..75ed332 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostConfigMappingEntity.java
@@ -43,7 +43,7 @@ import javax.persistence.Table;
 public class HostConfigMappingEntity {
 
   @Id
-  @Column(name = "cluster_id", insertable = false, updatable = false, nullable 
= false)
+  @Column(name = "cluster_id", nullable = false)
   private Long clusterId;
 
   @Id
@@ -61,18 +61,18 @@ public class HostConfigMappingEntity {
   @Column(name = "version_tag", insertable = true, updatable = false, nullable 
= false)
   private String versionTag;
 
-  @Column(name = "service_id", insertable = false, updatable = false, nullable 
= false)
+  @Column(name = "service_id")
   private Long serviceId;
 
-  @Column(name = "service_group_id", insertable = false, updatable = false, 
nullable = false)
+  @Column(name = "service_group_id")
   private Long serviceGroupId;
 
   @ManyToOne
   @JoinColumns(
       {
-          @JoinColumn(name = "cluster_id", referencedColumnName = 
"cluster_id", nullable = false),
-          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", nullable = false),
-          @JoinColumn(name = "service_id", referencedColumnName = "id", 
nullable = false)
+          @JoinColumn(name = "cluster_id", referencedColumnName = 
"cluster_id", insertable = false, updatable = false,  nullable = false),
+          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", insertable = false, updatable = false, nullable = false),
+          @JoinColumn(name = "service_id", referencedColumnName = "id", 
insertable = false, updatable = false, nullable = false)
       })
   private ClusterServiceEntity clusterServiceEntity;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
index 7cd7e98..030706e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
@@ -76,19 +76,19 @@ public class ServiceConfigEntity {
   private Long clusterId;
 
   @Basic
-  @Column(name = "service_id", nullable = false, insertable = false, updatable 
= false)
+  @Column(name = "service_id", nullable = false)
   private Long serviceId;
 
   @Basic
-  @Column(name = "service_group_id", nullable = false, insertable = false, 
updatable = false)
+  @Column(name = "service_group_id", nullable = false)
   private Long serviceGroupId;
 
   @ManyToOne
   @JoinColumns(
       {
           @JoinColumn(name = "cluster_id", referencedColumnName = 
"cluster_id", nullable = false, insertable = false, updatable = false),
-          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", nullable = false),
-          @JoinColumn(name = "service_id", referencedColumnName = "id", 
nullable = false)
+          @JoinColumn(name = "service_group_id", referencedColumnName = 
"service_group_id", insertable = false, updatable = false, nullable = false),
+          @JoinColumn(name = "service_id", referencedColumnName = "id", 
insertable = false, updatable = false, nullable = false)
       })
   private ClusterServiceEntity clusterServiceEntity;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/490808cb/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 0b38b36..3b0ebae 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.stream.Collectors;
@@ -154,12 +155,10 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Functions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -198,7 +197,7 @@ public class ClusterImpl implements Cluster {
   private final ConcurrentMap<String, ConcurrentMap<String, Config>> 
allConfigs = new ConcurrentHashMap<>();
 
   /**
-   * [ Service -> [ allConfigs ] ]
+   * [ Service ID -> [ Config Type -> [ Config Version Tag -> Config ] ] ]
    */
   private final ConcurrentMap<Long, ConcurrentMap<String, 
ConcurrentMap<String, Config>>>  serviceConfigs = new ConcurrentHashMap<>();
 
@@ -213,6 +212,11 @@ public class ClusterImpl implements Cluster {
   private final ConcurrentMap<String, List<ServiceComponentHost>> 
serviceComponentHostsByHost = new ConcurrentHashMap<>();
 
   /**
+   * [ Service ID -> [ set of config types ] ]
+   */
+  private final ConcurrentMap<Long, ConcurrentSkipListSet<String>> 
serviceConfigTypes = new ConcurrentHashMap<>();
+
+  /**
    * Map of existing config groups
    */
   private final Map<Long, ConfigGroup> clusterConfigGroups = new 
ConcurrentHashMap<>();
@@ -309,8 +313,6 @@ public class ClusterImpl implements Cluster {
   @Inject
   private StackDAO stackDAO;
 
-  private volatile Multimap<Long, String> serviceConfigTypes;
-
   /**
    * Used to publish events relating to cluster CRUD operations and to receive
    * information about cluster operations.
@@ -385,18 +387,18 @@ public class ClusterImpl implements Cluster {
   }
 
   private void loadServiceConfigTypes(Service service) throws AmbariException {
-    if (serviceConfigTypes == null) {
-      serviceConfigTypes = HashMultimap.create();
-    } else {
-      serviceConfigTypes.removeAll(service.getServiceId());
-    }
+    clusterGlobalLock.readLock().lock();
     try {
-      serviceConfigTypes.putAll(service.getServiceId(), 
collectServiceConfigTypes(service));
-    } catch (AmbariException e) {
-      LOG.error("Cannot load service info:", e);
-      throw e;
+      try {
+        serviceConfigTypes.put(service.getServiceId(), new 
ConcurrentSkipListSet<>(collectServiceConfigTypes(service)));
+      } catch (AmbariException e) {
+        LOG.error("Cannot load service info:", e);
+        throw e;
+      }
+      LOG.info("Updated service config types: {}", serviceConfigTypes);
+    } finally {
+      clusterGlobalLock.readLock().unlock();
     }
-    LOG.info("Updated service config types: {}", serviceConfigTypes);
   }
 
   /**
@@ -903,18 +905,23 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void addService(Service service) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding a new Service, clusterName={}, clusterId={}, 
serviceName={} serviceType={}",
-                 getClusterName(), getClusterId(), service.getName(), 
service.getServiceType());
-    }
-    //TODO get rid of services map
-    services.put(service.getName(), service);
-
-    servicesById.put(service.getServiceId(), service);
+    clusterGlobalLock.writeLock().lock();
     try {
-      loadServiceConfigTypes(service);
-    } catch (AmbariException e) {
-      e.printStackTrace();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding a new Service, clusterName={}, clusterId={}, 
serviceName={} serviceType={}",
+            getClusterName(), getClusterId(), service.getName(), 
service.getServiceType());
+      }
+      //TODO get rid of services map
+      services.put(service.getName(), service);
+
+      servicesById.put(service.getServiceId(), service);
+      try {
+        loadServiceConfigTypes(service);
+      } catch (AmbariException e) {
+        e.printStackTrace();
+      }
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
     }
   }
 
@@ -1094,26 +1101,31 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Service getService(String serviceGroupName, String serviceName) 
throws AmbariException {
+    clusterGlobalLock.readLock().lock();
     Service service = null;
-    //TODO use serviceIds instead of service name (remove this if)
-    if (serviceGroupName == null) {
-      service = getService(serviceName);
-      if (null == service) {
-        throw new ServiceNotFoundException(getClusterName(), serviceName);
-      }
-    } else {
-      for (Service serviceCandidate : servicesById.values()) {
-        if (serviceCandidate.getClusterId().equals(this.getClusterId())
-            && StringUtils.equals(serviceCandidate.getServiceGroupName(), 
serviceGroupName)
-            && StringUtils.equals(serviceCandidate.getName(), serviceName)) {
-          if (service == null) {
-            service = serviceCandidate;
-          } else {
-            LOG.error("Two services entities found for same serviceGroup and 
serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate);
-            throw new AmbariException(String.format("Two services entities 
found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", 
service, serviceCandidate));
+    try {
+      //TODO use serviceIds instead of service name (remove this if)
+      if (serviceGroupName == null) {
+        service = getService(serviceName);
+        if (null == service) {
+          throw new ServiceNotFoundException(getClusterName(), serviceName);
+        }
+      } else {
+        for (Service serviceCandidate : servicesById.values()) {
+          if (serviceCandidate.getClusterId().equals(this.getClusterId())
+              && StringUtils.equals(serviceCandidate.getServiceGroupName(), 
serviceGroupName)
+              && StringUtils.equals(serviceCandidate.getName(), serviceName)) {
+            if (service == null) {
+              service = serviceCandidate;
+            } else {
+              LOG.error("Two services entities found for same serviceGroup and 
serviceName : service1 {%s}, service2 {%s}", service, serviceCandidate);
+              throw new AmbariException(String.format("Two services entities 
found for same serviceGroup and serviceName : service1 {%s}, service2 {%s}", 
service, serviceCandidate));
+            }
           }
         }
       }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
     }
 
     if (null == service) {
@@ -1125,9 +1137,15 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Service getService(Long serviceId) throws AmbariException {
-    Service service = servicesById.get(serviceId);
-    if (null == service) {
-      throw new ServiceNotFoundException(getClusterName(), serviceId);
+    clusterGlobalLock.readLock().lock();
+    Service service = null;
+    try {
+      service = servicesById.get(serviceId);
+      if (null == service) {
+        throw new ServiceNotFoundException(getClusterName(), serviceId);
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
     }
     return service;
   }
@@ -1794,7 +1812,7 @@ public class ClusterImpl implements Cluster {
     serviceComponentHosts.remove(serviceName);
     services.remove(serviceName);
     servicesById.remove(service.getServiceId());
-    serviceConfigTypes.removeAll(service.getServiceId());
+    serviceConfigTypes.remove(service.getServiceId());
 
     for (List<ServiceComponentHost> serviceComponents : 
serviceComponentHostsByHost.values()) {
       Iterables.removeIf(serviceComponents, new 
Predicate<ServiceComponentHost>() {
@@ -1876,11 +1894,13 @@ public class ClusterImpl implements Cluster {
     clusterDAO.removeByPK(clusterId);
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public ServiceConfigVersionResponse addDesiredConfig(String user, 
Set<Config> configs) throws AmbariException {
     return addDesiredConfig(user, configs, null);
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public ServiceConfigVersionResponse addDesiredConfig(String user, 
Set<Config> configs, String serviceConfigVersionNote) throws AmbariException {
     if (null == user) {
@@ -1923,12 +1943,14 @@ public class ClusterImpl implements Cluster {
    * Gets all versions of the desired configurations for the cluster.
    * @return a map of type-to-configuration information.
    */
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Map<String, Set<DesiredConfig>> getAllDesiredConfigVersions() {
     return getDesiredConfigs(true);
   }
 
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Map<String, DesiredConfig> getDesiredConfigs() {
     Map<String, Set<DesiredConfig>> activeConfigsByType = 
getDesiredConfigs(false);
@@ -1949,6 +1971,7 @@ public class ClusterImpl implements Cluster {
    *                    desired configuration per config type.
    * @return a map of type-to-configuration information.
    */
+  //TODO this needs to be reworked to support multiple instance of same service
   private Map<String, Set<DesiredConfig>> getDesiredConfigs(boolean 
allVersions) {
     clusterGlobalLock.readLock().lock();
     try {
@@ -2091,44 +2114,40 @@ public class ClusterImpl implements Cluster {
     return response;
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Long getServiceForConfigTypes(Collection<String> configTypes) {
     //debug
     LOG.info("Looking for service for config types {}", configTypes);
-    Long serviceId = null;
-    for (String configType : configTypes) {
-      for (Entry<Long, String> entry : serviceConfigTypes.entries()) {
-        if (StringUtils.equals(entry.getValue(), configType)) {
-          if (serviceId != null) {
-            if (entry.getKey() != null && !serviceId.equals(entry.getKey())) {
-              throw new IllegalArgumentException(String.format("Config type %s 
belongs to %s service, " +
-                "but also qualified for %s", configType, 
getServiceOrNull(serviceId), getServiceOrNull(entry.getKey())));
-            }
-          } else {
-            serviceId = entry.getKey();
-          }
-        }
+    List<Long> resultingServiceIds = new ArrayList<>();
+
+    for (Long serviceId : serviceConfigTypes.keySet()) {
+      Set<String> configTypesForServiceId = serviceConfigTypes.get(serviceId);
+      if (configTypesForServiceId.containsAll(configTypes) && 
configTypesForServiceId.size() == configTypes.size()) {
+          resultingServiceIds.add(serviceId);
       }
     }
-    if (serviceId == null) {
-      LOG.warn("Can't find serviceId for {}, there is a problem if there's no 
cluster-env", configTypes);
+    if (resultingServiceIds.isEmpty()) {
+      LOG.warn("Can't find serviceIds for {}, there is a problem if there's no 
cluster-env", configTypes);
     } else {
-      LOG.info("Service {} returning", getServiceOrNull(serviceId));
+      LOG.info("Service {} returning", 
getServiceOrNull(resultingServiceIds.get(0)));
     }
 
-    return serviceId;
+    //TODO this needs to be reworked to support multiple instance of same 
service
+    return resultingServiceIds.get(0);
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Service getServiceByConfigType(String configType) {
-    for (Entry<Long, String> entry : serviceConfigTypes.entries()) {
-      Long serviceId = entry.getKey();
-      String type = entry.getValue();
-      if (StringUtils.equals(type, configType)) {
-        return getServiceOrNull(serviceId);
+    List<Long> resultingServiceIds = new ArrayList<>();
+    for (Long serviceId : serviceConfigTypes.keySet()) {
+      if (serviceConfigTypes.get(serviceId).contains(configType)) {
+        resultingServiceIds.add(serviceId);
       }
     }
-    return null;
+    //TODO this needs to be reworked to support multiple instance of same 
service
+    return getServiceOrNull(resultingServiceIds.get(0));
   }
 
   @Override
@@ -2399,22 +2418,22 @@ public class ClusterImpl implements Cluster {
     return convertToServiceConfigVersionResponse(serviceConfigEntityClone);
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Transactional
   ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, 
String serviceConfigVersionNote) throws AmbariException {
-
-    Long serviceId = null;
+    Long resultingServiceId = null;
     for (Config config : configs) {
-      for (Entry<Long, String> entry : serviceConfigTypes.entries()) {
-        if (StringUtils.equals(entry.getValue(), config.getType())) {
-          if (serviceId == null) {
-            serviceId = entry.getKey();
+      for (Long serviceId : serviceConfigTypes.keySet()) {
+        if (serviceConfigTypes.get(serviceId).contains(config.getType())) {
+          if (resultingServiceId == null) {
+            resultingServiceId = serviceId;
             break;
-          } else if (!serviceId.equals(entry.getKey())) {
+          } else if (!resultingServiceId.equals(serviceId)) {
             String error = String.format("Updating configs for multiple 
services by a " +
                 "single API request isn't supported. Conflicting services %s 
and %s for %s",
-                getServiceOrNull(serviceId).getName(), 
getServiceOrNull(entry.getKey()).getName(), config.getType());
+                getService(serviceId).getName(), 
getService(resultingServiceId).getName(), config.getType());
             IllegalArgumentException exception = new 
IllegalArgumentException(error);
-            LOG.error(error + ", config version not created for {}", 
getServiceOrNull(serviceId).getName());
+            LOG.error(error + ", config version not created for {}", 
getService(resultingServiceId).getName());
             throw exception;
           } else {
             break;
@@ -2442,7 +2461,7 @@ public class ClusterImpl implements Cluster {
 
     clusterEntity = clusterDAO.merge(clusterEntity);
 
-    if (serviceId == null) {
+    if (resultingServiceId == null) {
       ArrayList<String> configTypes = new ArrayList<>();
       for (Config config : configs) {
         configTypes.add(config.getType());
@@ -2450,7 +2469,7 @@ public class ClusterImpl implements Cluster {
       LOG.error("No service found for config types '{}', service config 
version not created", configTypes);
       return null;
     } else {
-      return createServiceConfigVersion(serviceId, user, 
serviceConfigVersionNote);
+      return createServiceConfigVersion(resultingServiceId, user, 
serviceConfigVersionNote);
     }
 
   }
@@ -2466,6 +2485,7 @@ public class ClusterImpl implements Cluster {
     return clusterDAO.getEnabledConfigsByTypes(getClusterId(), new 
ArrayList<>(configTypes));
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Config getDesiredConfigByType(String configType) {
     ClusterConfigEntity config = 
clusterDAO.findEnabledConfigByType(getClusterId(), configType);
@@ -2482,6 +2502,7 @@ public class ClusterImpl implements Cluster {
     return null != config;
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Map<Long, Map<String, DesiredConfig>> 
getHostsDesiredConfigs(Collection<Long> hostIds) {
 
@@ -2510,6 +2531,7 @@ public class ClusterImpl implements Cluster {
     return desiredConfigsByHost;
   }
 
+  //TODO this needs to be reworked to support multiple instance of same service
   @Override
   public Map<Long, Map<String, DesiredConfig>> getAllHostsDesiredConfigs() {
 

Reply via email to