Repository: ambari Updated Branches: refs/heads/trunk d05c9c287 -> 66a4bfb26
AMBARI-11038. Fix timing issue regarding setting of topology resolved configuration for clusters provisioned via blueprints Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/66a4bfb2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/66a4bfb2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/66a4bfb2 Branch: refs/heads/trunk Commit: 66a4bfb26ad5ffffa59d925e56f465bec3346696 Parents: d05c9c2 Author: John Speidel <[email protected]> Authored: Mon May 11 15:38:25 2015 -0400 Committer: John Speidel <[email protected]> Committed: Mon May 11 17:37:03 2015 -0400 ---------------------------------------------------------------------- .../topology/ClusterConfigurationRequest.java | 14 ++-- .../ambari/server/topology/LogicalRequest.java | 14 ---- .../ambari/server/topology/TopologyManager.java | 67 ++++++++++---------- .../server/topology/TopologyManagerTest.java | 27 +++----- 4 files changed, 50 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java index a8c2ff3..eb583fd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java @@ -84,7 +84,7 @@ public class ClusterConfigurationRequest { */ public void setConfigurationsOnCluster(ClusterTopology clusterTopology, String tag) { //todo: also handle setting of host group scoped configuration which is updated by config processor - List<BlueprintServiceConfigRequest> listofConfigRequests = new LinkedList<BlueprintServiceConfigRequest>(); + List<BlueprintServiceConfigRequest> configurationRequests = new LinkedList<BlueprintServiceConfigRequest>(); Blueprint blueprint = clusterTopology.getBlueprint(); Configuration clusterConfiguration = clusterTopology.getConfiguration(); @@ -108,7 +108,7 @@ public class ClusterConfigurationRequest { } } - listofConfigRequests.add(blueprintConfigRequest); + configurationRequests.add(blueprintConfigRequest); } // since the stack returns "cluster-env" with each service's config ensure that only one @@ -118,9 +118,9 @@ public class ClusterConfigurationRequest { Map<String, Map<String, String>> clusterEnvAttributes = clusterConfiguration.getFullAttributes().get("cluster-env"); globalConfigRequest.addConfigElement("cluster-env", clusterEnvProps,clusterEnvAttributes); - listofConfigRequests.add(globalConfigRequest); + configurationRequests.add(globalConfigRequest); - setConfigurationsOnCluster(listofConfigRequests, tag); + setConfigurationsOnCluster(configurationRequests, tag); } /** @@ -131,12 +131,12 @@ public class ClusterConfigurationRequest { * * This method will also send these requests to the management controller. * - * @param listOfBlueprintConfigRequests a list of requests to send to the AmbariManagementController. + * @param configurationRequests a list of requests to send to the AmbariManagementController. */ - private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> listOfBlueprintConfigRequests, + private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> configurationRequests, String tag) { // iterate over services to deploy - for (BlueprintServiceConfigRequest blueprintConfigRequest : listOfBlueprintConfigRequests) { + for (BlueprintServiceConfigRequest blueprintConfigRequest : configurationRequests) { ClusterRequest clusterRequest = null; // iterate over the config types associated with this service List<ConfigurationRequest> requestsPerService = new LinkedList<ConfigurationRequest>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index 087ad4c..88c791b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -158,20 +158,6 @@ public class LogicalRequest extends Request { return new ArrayList<HostRequest>(allHostRequests); } - //todo: account for blueprint name? - //todo: this should probably be done implicitly at a lower level - public boolean areGroupsResolved(Collection<String> hostGroupNames) { - synchronized (outstandingHostRequests) { - // iterate over outstanding host requests - for (HostRequest request : outstandingHostRequests) { - if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) { - return false; - } - } - } - return true; - } - public Map<String, Collection<String>> getProjectedTopology() { Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 864655d..e6c2f1e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -53,7 +53,6 @@ public class TopologyManager { public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED"; private PersistedState persistedState; - //private ExecutorService executor = getExecutorService(); private ExecutorService executor = Executors.newSingleThreadExecutor(); private Collection<String> hostsToIgnore = new HashSet<String>(); private final List<HostImpl> availableHosts = new LinkedList<HostImpl>(); @@ -83,8 +82,8 @@ public class TopologyManager { private void ensureInitialized() { synchronized(initializationLock) { if (! isInitialized) { - isInitialized = true; replayRequests(persistedState.getAllRequests()); + isInitialized = true; } } } @@ -99,7 +98,7 @@ public class TopologyManager { String clusterName = topology.getClusterName(); clusterTopologyMap.put(clusterName, topology); - addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, true)); + addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true)); LogicalRequest logicalRequest = processRequest(persistedRequest, topology); //todo: this should be invoked as part of a generic lifecycle event which could possibly @@ -394,7 +393,7 @@ public class TopologyManager { if (! configChecked) { configChecked = true; if (! ambariContext.doesConfigurationWithTagExist(topology.getClusterName(), TOPOLOGY_RESOLVED_TAG)) { - addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, false)); + addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, false)); } } } @@ -423,28 +422,26 @@ public class TopologyManager { } /** - * Making it a synchronous call as resolution of initial configurations need to happen before - * any tasks are created - * TODO - needs further review + * Register the configuration task which is responsible for configuration topology resolution + * and setting the updated configuration on the cluster. This task needs to be submitted to the + * executor before any host requests to ensure that no install or start tasks are executed prior + * to configuration being set on the cluster. + * + * @param topology cluster topology + * @param configurationRequest configuration request to be executed */ - private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) { - ConfigureClusterTask cct = new ConfigureClusterTask(configurationRequest); - cct.run(); - //executor.execute(new ConfigureClusterTask(configurationRequest)); - //try { - // executor.awaitTermination(10, TimeUnit.SECONDS); - // LOG.info("Resolved cluster topology configuration."); - //}catch(InterruptedException ex) { - // LOG.warn("Failed to resolve topology configuration"); - //} + private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { + executor.execute(new ConfigureClusterTask(topology, configurationRequest)); } private class ConfigureClusterTask implements Runnable { private ClusterConfigurationRequest configRequest; + private ClusterTopology topology; - public ConfigureClusterTask(ClusterConfigurationRequest configRequest) { + public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) { this.configRequest = configRequest; + this.topology = topology; } @Override @@ -457,14 +454,13 @@ public class TopologyManager { Collection<String> requiredHostGroups = getTopologyRequiredHostGroups(); while (! completed && ! interrupted) { try { - Thread.sleep(200); + Thread.sleep(100); } catch (InterruptedException e) { interrupted = true; // reset interrupted flag on thread Thread.interrupted(); } - - completed = areConfigsResolved(requiredHostGroups); + completed = areRequiredHostGroupsResolved(requiredHostGroups); } if (! interrupted) { @@ -478,12 +474,15 @@ public class TopologyManager { "An exception occurred while attempting to process cluster configs and set on cluster: " + e); e.printStackTrace(); } - - //executePendingTasks(); } LOG.info("TopologyManager.ConfigureClusterTask: Exiting"); } + /** + * Return the set of host group names which are required for configuration topology resolution. + * + * @return set of required host group names + */ private Collection<String> getTopologyRequiredHostGroups() { Collection<String> requiredHostGroups; try { @@ -497,16 +496,20 @@ public class TopologyManager { return requiredHostGroups; } - // get set of required host groups from config processor and confirm that all requests - // have fully resolved the host names for the required host groups - private boolean areConfigsResolved(Collection<String> requiredHostGroups) { + /** + * Determine if all hosts for the given set of required host groups are known. + * + * @param requiredHostGroups set of required host groups + * @return true if all required host groups are resolved + */ + private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) { boolean configTopologyResolved = true; - synchronized (outstandingRequests) { - for (LogicalRequest outstandingRequest : outstandingRequests) { - if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) { - configTopologyResolved = false; - break; - } + Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo(); + for (String hostGroup : requiredHostGroups) { + HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup); + if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) { + configTopologyResolved = false; + break; } } return configTopologyResolved; http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 53abd1c..df87aec 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -72,9 +72,7 @@ public class TopologyManagerTest { private final ConfigurationRequest configurationRequest = createNiceMock(ConfigurationRequest.class); private final ConfigurationRequest configurationRequest2 = createNiceMock(ConfigurationRequest.class); private final ConfigurationRequest configurationRequest3 = createNiceMock(ConfigurationRequest.class); - private final ConfigurationRequest configurationRequest4 = createNiceMock(ConfigurationRequest.class); - private final ConfigurationRequest configurationRequest5 = createNiceMock(ConfigurationRequest.class); - private final ConfigurationRequest configurationRequest6 = createNiceMock(ConfigurationRequest.class); + private final RequestStatusResponse requestStatusResponse = createNiceMock(RequestStatusResponse.class); private final ExecutorService executor = createStrictMock(ExecutorService.class); private final PersistedState persistedState = createStrictMock(PersistedState.class); @@ -118,9 +116,6 @@ public class TopologyManagerTest { private Capture<Map<String, Object>> configRequestPropertiesCapture; private Capture<Map<String, Object>> configRequestPropertiesCapture2; private Capture<Map<String, Object>> configRequestPropertiesCapture3; - private Capture<Map<String, Object>> configRequestPropertiesCapture4; - private Capture<Map<String, Object>> configRequestPropertiesCapture5; - private Capture<Map<String, Object>> configRequestPropertiesCapture6; private Capture<ClusterRequest> updateClusterConfigRequestCapture; private Capture<Runnable> updateConfigTaskCapture; @@ -131,9 +126,6 @@ public class TopologyManagerTest { configRequestPropertiesCapture = new Capture<Map<String, Object>>(); configRequestPropertiesCapture2 = new Capture<Map<String, Object>>(); configRequestPropertiesCapture3 = new Capture<Map<String, Object>>(); - configRequestPropertiesCapture4 = new Capture<Map<String, Object>>(); - configRequestPropertiesCapture5 = new Capture<Map<String, Object>>(); - configRequestPropertiesCapture6 = new Capture<Map<String, Object>>(); updateClusterConfigRequestCapture = new Capture<ClusterRequest>(); updateConfigTaskCapture = new Capture<Runnable>(); @@ -248,25 +240,23 @@ public class TopologyManagerTest { //expectLastCall().once(); //ambariContext.registerHostWithConfigGroup(eq("host1"), isA(ClusterTopologyImpl.class), eq("group1")); //expectLastCall().once(); + + // cluster configuration task run() isn't executed by mock executor + // so only INITIAL config expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture))). andReturn(Collections.singletonList(configurationRequest)); expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture2))). andReturn(Collections.singletonList(configurationRequest2)).once(); expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture3))). andReturn(Collections.singletonList(configurationRequest3)).once(); - expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture4))). - andReturn(Collections.singletonList(configurationRequest4)).once(); - expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture5))). - andReturn(Collections.singletonList(configurationRequest5)).once(); - expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture6))). - andReturn(Collections.singletonList(configurationRequest6)).once(); + ambariContext.setConfigurationOnCluster(capture(updateClusterConfigRequestCapture)); - expectLastCall().times(6); + expectLastCall().times(3); ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION); expectLastCall().once(); - //executor.execute(capture(updateConfigTaskCapture)); - //expectLastCall().times(0); + executor.execute(capture(updateConfigTaskCapture)); + expectLastCall().times(1); expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, List<LogicalRequest>>emptyMap()).once(); @@ -306,7 +296,6 @@ public class TopologyManagerTest { @Test public void testProvisionCluster() throws Exception { topologyManager.provisionCluster(request); - //todo: assertions }
