AMBARI-21427. Assigning hosts concurrently to same config group may fail with "org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException: Config group already exist". (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/639f4523 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/639f4523 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/639f4523 Branch: refs/heads/branch-feature-logsearch-ui Commit: 639f4523fdf49c8e0dddf79074cdb7eb4e43940c Parents: 70cf77e Author: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Authored: Tue Jul 11 00:55:59 2017 +0300 Committer: Vitaly Brodetskyi <vbrodets...@hortonworks.com> Committed: Tue Jul 11 00:55:59 2017 +0300 ---------------------------------------------------------------------- .../ambari/server/topology/AmbariContext.java | 81 +++++++++++++++----- 1 file changed, 62 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/639f4523/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java index 106d7c8..dee0e6c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,6 +30,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.inject.Inject; @@ -69,9 +70,11 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ConfigFactory; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.DesiredConfig; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.utils.RetryHelper; import org.slf4j.Logger; @@ -79,6 +82,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Striped; +import com.google.inject.Provider; /** @@ -99,6 +104,12 @@ public class AmbariContext { @Inject ConfigFactory configFactory; + /** + * Used for getting configuration property values from stack and services. + */ + @Inject + private Provider<ConfigHelper> configHelper; + private static AmbariManagementController controller; private static ClusterController clusterController; //todo: task id's. Use existing mechanism for getting next task id sequence @@ -112,6 +123,16 @@ public class AmbariContext { private final static Logger LOG = LoggerFactory.getLogger(AmbariContext.class); + + /** + * When config groups are created using Blueprints these are created when + * hosts join a hostgroup and are added to the corresponding config group. + * Since hosts join in parallel there might be a race condition in creating + * the config group a host is to be added to. Thus we need to synchronize + * the creation of config groups with the same name. + */ + private Striped<Lock> configGroupCreateLock = Striped.lazyWeakLock(1); + public boolean isClusterKerberosEnabled(long clusterId) { Cluster cluster; try { @@ -167,9 +188,10 @@ public class AmbariContext { public void createAmbariResources(ClusterTopology topology, String clusterName, SecurityType securityType, String repoVersion) { Stack stack = topology.getBlueprint().getStack(); + StackId stackId = new StackId(stack.getName(), stack.getVersion()); createAmbariClusterResource(clusterName, stack.getName(), stack.getVersion(), securityType, repoVersion); - createAmbariServiceAndComponentResources(topology, clusterName); + createAmbariServiceAndComponentResources(topology, clusterName, stackId, repoVersion); } public void createAmbariClusterResource(String clusterName, String stackName, String stackVersion, SecurityType securityType, String repoVersion) { @@ -196,7 +218,8 @@ public class AmbariContext { } } - public void createAmbariServiceAndComponentResources(ClusterTopology topology, String clusterName) { + public void createAmbariServiceAndComponentResources(ClusterTopology topology, String clusterName, + StackId stackId, String repositoryVersion) { Collection<String> services = topology.getBlueprint().getServices(); try { @@ -205,11 +228,13 @@ public class AmbariContext { } catch (AmbariException e) { throw new RuntimeException("Failed to persist service and component resources: " + e, e); } - Set<ServiceRequest> serviceRequests = new HashSet<ServiceRequest>(); - Set<ServiceComponentRequest> componentRequests = new HashSet<ServiceComponentRequest>(); + Set<ServiceRequest> serviceRequests = new HashSet<>(); + Set<ServiceComponentRequest> componentRequests = new HashSet<>(); for (String service : services) { String credentialStoreEnabled = topology.getBlueprint().getCredentialStoreEnabled(service); - serviceRequests.add(new ServiceRequest(clusterName, service, null, credentialStoreEnabled)); + serviceRequests.add(new ServiceRequest(clusterName, service, stackId.getStackId(), + repositoryVersion, null, credentialStoreEnabled)); + for (String component : topology.getBlueprint().getComponents(service)) { String recoveryEnabled = topology.getBlueprint().getRecoveryEnabled(service, component); componentRequests.add(new ServiceComponentRequest(clusterName, service, component, null, recoveryEnabled)); @@ -223,14 +248,14 @@ public class AmbariContext { } // set all services state to INSTALLED->STARTED // this is required so the user can start failed services at the service level - Map<String, Object> installProps = new HashMap<String, Object>(); + Map<String, Object> installProps = new HashMap<>(); installProps.put(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, "INSTALLED"); installProps.put(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); - Map<String, Object> startProps = new HashMap<String, Object>(); + Map<String, Object> startProps = new HashMap<>(); startProps.put(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, "STARTED"); startProps.put(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); - Predicate predicate = new EqualsPredicate<String>( - ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); + Predicate predicate = new EqualsPredicate<>( + ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); try { getServiceResourceProvider().updateResources( new RequestImpl(null, Collections.singleton(installProps), null, null), predicate); @@ -262,9 +287,9 @@ public class AmbariContext { } String clusterName = cluster.getClusterName(); - Map<String, Object> properties = new HashMap<String, Object>(); + Map<String, Object> properties = new HashMap<>(); properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, clusterName); - properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, hostName); + properties.put(HostResourceProvider.HOST_HOST_NAME_PROPERTY_ID, hostName); properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo()); try { @@ -275,7 +300,7 @@ public class AmbariContext { hostName, e.toString()), e); } - final Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>(); + final Set<ServiceComponentHostRequest> requests = new HashSet<>(); for (Map.Entry<String, Collection<String>> entry : components.entrySet()) { String service = entry.getKey(); @@ -328,11 +353,17 @@ public class AmbariContext { } public void registerHostWithConfigGroup(final String hostName, final ClusterTopology topology, final String groupName) { + String qualifiedGroupName = getConfigurationGroupName(topology.getBlueprint().getName(), groupName); + + Lock configGroupLock = configGroupCreateLock.get(qualifiedGroupName); + try { + configGroupLock.lock(); + boolean hostAdded = RetryHelper.executeWithRetry(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - return addHostToExistingConfigGroups(hostName, topology, groupName); + return addHostToExistingConfigGroups(hostName, topology, qualifiedGroupName); } }); if (!hostAdded) { @@ -342,6 +373,9 @@ public class AmbariContext { LOG.error("Unable to register config group for host: ", e); throw new RuntimeException("Unable to register config group for host: " + hostName); } + finally { + configGroupLock.unlock(); + } } public RequestStatusResponse installHost(String hostName, String clusterName, Collection<String> skipInstallForComponents, Collection<String> dontSkipInstallForComponents, boolean skipFailure) { @@ -549,7 +583,7 @@ public class AmbariContext { /** * Add the new host to an existing config group. */ - private boolean addHostToExistingConfigGroups(String hostName, ClusterTopology topology, String groupName) { + private boolean addHostToExistingConfigGroups(String hostName, ClusterTopology topology, String configGroupName) { boolean addedHost = false; Clusters clusters; Cluster cluster; @@ -563,9 +597,8 @@ public class AmbariContext { // I don't know of a method to get config group by name //todo: add a method to get config group by name Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups(); - String qualifiedGroupName = getConfigurationGroupName(topology.getBlueprint().getName(), groupName); for (ConfigGroup group : configGroups.values()) { - if (group.getName().equals(qualifiedGroupName)) { + if (group.getName().equals(configGroupName)) { try { Host host = clusters.getHost(hostName); addedHost = true; @@ -589,7 +622,7 @@ public class AmbariContext { * and the hosts associated with the host group are assigned to the config group. */ private void createConfigGroupsAndRegisterHost(ClusterTopology topology, String groupName) throws AmbariException { - Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>(); + Map<String, Map<String, Config>> groupConfigs = new HashMap<>(); Stack stack = topology.getBlueprint().getStack(); // get the host-group config with cluster creation template overrides @@ -608,7 +641,7 @@ public class AmbariContext { //todo: attributes Map<String, Config> serviceConfigs = groupConfigs.get(service); if (serviceConfigs == null) { - serviceConfigs = new HashMap<String, Config>(); + serviceConfigs = new HashMap<>(); groupConfigs.put(service, serviceConfigs); } serviceConfigs.put(type, config); @@ -669,6 +702,16 @@ public class AmbariContext { return String.format("%s:%s", bpName, hostGroupName); } + /** + * Gets an instance of {@link ConfigHelper} for classes which are not + * dependency injected. + * + * @return a {@link ConfigHelper} instance. + */ + public ConfigHelper getConfigHelper() { + return configHelper.get(); + } + private synchronized HostResourceProvider getHostResourceProvider() { if (hostResourceProvider == null) { hostResourceProvider = (HostResourceProvider)