AMBARI-20178. Add authentication for Topology tasks (magyari_sandor)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ab46f0c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ab46f0c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ab46f0c Branch: refs/heads/branch-feature-AMBARI-12556 Commit: 5ab46f0cca5cb2dc6250f44ebe2a3603435706fe Parents: d481b78 Author: Sandor Magyari <[email protected]> Authored: Thu Mar 2 15:15:31 2017 +0100 Committer: Sandor Magyari <[email protected]> Committed: Thu Mar 2 18:58:01 2017 +0100 ---------------------------------------------------------------------- .../server/controller/ControllerModule.java | 14 +- .../internal/ClusterResourceProvider.java | 2 + .../InternalAuthenticationInterceptor.java | 51 ++++++ .../RunWithInternalSecurityContext.java | 36 ++++ .../ambari/server/topology/AmbariContext.java | 7 +- .../server/topology/HostOfferResponse.java | 2 +- .../ambari/server/topology/HostRequest.java | 178 ++----------------- .../server/topology/PersistedStateImpl.java | 1 + .../ambari/server/topology/TopologyManager.java | 101 ++--------- .../ambari/server/topology/TopologyTask.java | 47 ----- .../topology/tasks/ConfigureClusterTask.java | 122 +++++++++++++ .../tasks/ConfigureClusterTaskFactory.java | 30 ++++ .../server/topology/tasks/InstallHostTask.java | 70 ++++++++ .../tasks/PersistHostResourcesTask.java | 59 ++++++ .../tasks/RegisterWithConfigGroupTask.java | 50 ++++++ .../server/topology/tasks/StartHostTask.java | 67 +++++++ .../server/topology/tasks/TopologyHostTask.java | 59 ++++++ .../server/topology/tasks/TopologyTask.java | 42 +++++ .../ambari/server/agent/AgentResourceTest.java | 3 + .../server/state/cluster/ClustersTest.java | 2 +- .../ClusterDeployWithStartOnlyTest.java | 3 + ...InstallWithoutStartOnComponentLevelTest.java | 3 + .../ClusterInstallWithoutStartTest.java | 3 + .../topology/ConfigureClusterTaskTest.java | 7 +- .../server/topology/TopologyManagerTest.java | 3 + .../ambari/server/utils/StageUtilsTest.java | 2 + 26 files changed, 657 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index 482d602..4fa2362 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -18,6 +18,8 @@ package org.apache.ambari.server.controller; +import static com.google.inject.matcher.Matchers.annotatedWith; +import static com.google.inject.matcher.Matchers.any; import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_JDBC_DDL_FILE; import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_ONLY; import static org.eclipse.persistence.config.PersistenceUnitProperties.CREATE_OR_EXTEND; @@ -103,6 +105,8 @@ import org.apache.ambari.server.scheduler.ExecutionSchedulerImpl; import org.apache.ambari.server.security.SecurityHelper; import org.apache.ambari.server.security.SecurityHelperImpl; import org.apache.ambari.server.security.authorization.AuthorizationHelper; +import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationInterceptor; +import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext; import org.apache.ambari.server.security.encryption.CredentialStoreService; import org.apache.ambari.server.security.encryption.CredentialStoreServiceImpl; import org.apache.ambari.server.serveraction.kerberos.KerberosOperationHandlerFactory; @@ -145,6 +149,7 @@ import org.apache.ambari.server.topology.BlueprintFactory; import org.apache.ambari.server.topology.PersistedState; import org.apache.ambari.server.topology.PersistedStateImpl; import org.apache.ambari.server.topology.SecurityConfigurationFactory; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.apache.ambari.server.view.ViewInstanceHandlerList; import org.eclipse.jetty.server.SessionIdManager; import org.eclipse.jetty.server.SessionManager; @@ -396,6 +401,10 @@ public class ControllerModule extends AbstractModule { bindNotificationDispatchers(null); registerUpgradeChecks(null); bind(HookService.class).to(UserHookService.class); + + InternalAuthenticationInterceptor ambariAuthenticationInterceptor = new InternalAuthenticationInterceptor(); + requestInjection(ambariAuthenticationInterceptor); + bindInterceptor(any(), annotatedWith(RunWithInternalSecurityContext.class), ambariAuthenticationInterceptor); } // ----- helper methods ---------------------------------------------------- @@ -461,8 +470,8 @@ public class ControllerModule extends AbstractModule { .build(ResourceProviderFactory.class)); install(new FactoryModuleBuilder().implement( - ServiceComponent.class, ServiceComponentImpl.class).build( - ServiceComponentFactory.class)); + ServiceComponent.class, ServiceComponentImpl.class).build( + ServiceComponentFactory.class)); install(new FactoryModuleBuilder().implement( ServiceComponentHost.class, ServiceComponentHostImpl.class).build( ServiceComponentHostFactory.class)); @@ -492,6 +501,7 @@ public class ControllerModule extends AbstractModule { install(new FactoryModuleBuilder().implement(HookContext.class, PostUserCreationHookContext.class).build(HookContextFactory.class)); install(new FactoryModuleBuilder().implement(CollectionPersisterService.class, CsvFilePersisterService.class).build(CollectionPersisterServiceFactory.class)); + install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class)); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java index 613ab3f..577659d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java @@ -545,6 +545,8 @@ public class ClusterResourceProvider extends AbstractControllerResourceProvider throw new IllegalArgumentException("Topology validation failed: " + e, e); } catch (AmbariException e) { throw new SystemException("Unknown exception when asking TopologyManager to provision cluster", e); + } catch (RuntimeException e) { + throw new SystemException("An exception occurred during cluster provisioning: " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java new file mode 100644 index 0000000..e879f07 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/InternalAuthenticationInterceptor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.security.authorization.internal; + + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; + + +/** + * Allows running a given code within a security context authenticated with InternalAuthenticationToken. + * This only works for instances created by Guice. If there's already an Authentication in current security context + * that will be restored after calling the annotated method. + */ +public class InternalAuthenticationInterceptor implements MethodInterceptor { + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + + Authentication savedAuthContext = SecurityContextHolder.getContext().getAuthentication(); + try { + RunWithInternalSecurityContext securityAuthContextAnnotation = invocation.getMethod().getAnnotation(RunWithInternalSecurityContext + .class); + InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken(securityAuthContextAnnotation + .token()); + authenticationToken.setAuthenticated(true); + SecurityContextHolder.getContext().setAuthentication(authenticationToken); + return invocation.proceed(); + } finally { + SecurityContextHolder.getContext().setAuthentication(savedAuthContext); + } + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java new file mode 100644 index 0000000..3287d47 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/security/authorization/internal/RunWithInternalSecurityContext.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.security.authorization.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Methods annotated with will run within an security context authenticated with an InternalAuthenticationToken. + */ +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.METHOD }) +public @interface RunWithInternalSecurityContext { + + /* internal authentication token */ + String token(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java index adca3a3..ce36208 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java @@ -36,6 +36,7 @@ import javax.inject.Inject; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ClusterNotFoundException; +import org.apache.ambari.server.DuplicateResourceException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.actionmanager.HostRoleCommand; @@ -187,7 +188,11 @@ public class AmbariContext { } catch (AmbariException e) { LOG.error("Failed to create Cluster resource: ", e); - throw new RuntimeException("Failed to create Cluster resource: " + e, e); + if (e.getCause() instanceof DuplicateResourceException) { + throw new IllegalArgumentException(e); + } else { + throw new RuntimeException("Failed to create Cluster resource: " + e, e); + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java index e040e42..e220c50 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java @@ -22,6 +22,7 @@ package org.apache.ambari.server.topology; import java.util.List; import java.util.concurrent.Executor; +import org.apache.ambari.server.topology.tasks.TopologyTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +80,6 @@ final class HostOfferResponse { public void run() { for (TopologyTask task : tasks) { LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); - task.init(topology, ambariContext); task.run(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java index a6f677a..9152fd2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -25,17 +25,13 @@ import static org.apache.ambari.server.controller.internal.ProvisionAction.START import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.api.predicate.InvalidQueryException; import org.apache.ambari.server.api.predicate.PredicateCompiler; -import org.apache.ambari.server.controller.RequestStatusResponse; -import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.HostResourceProvider; -import org.apache.ambari.server.controller.internal.ProvisionAction; import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.internal.Stack; import org.apache.ambari.server.controller.spi.Predicate; @@ -45,9 +41,16 @@ import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; import org.apache.ambari.server.state.host.HostImpl; +import org.apache.ambari.server.topology.tasks.InstallHostTask; +import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask; +import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask; +import org.apache.ambari.server.topology.tasks.StartHostTask; +import org.apache.ambari.server.topology.tasks.TopologyTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + + /** * Represents a set of requests to a single host such as install, start, etc. */ @@ -183,10 +186,10 @@ public class HostRequest implements Comparable<HostRequest> { private void createTasks(boolean skipFailure) { // high level topology tasks such as INSTALL, START, ... - topologyTasks.add(new PersistHostResourcesTask()); - topologyTasks.add(new RegisterWithConfigGroupTask()); + topologyTasks.add(new PersistHostResourcesTask(topology, this)); + topologyTasks.add(new RegisterWithConfigGroupTask(topology, this)); - InstallHostTask installTask = new InstallHostTask(skipFailure); + InstallHostTask installTask = new InstallHostTask(topology, this, skipFailure); topologyTasks.add(installTask); logicalTaskMap.put(installTask, new HashMap<String, Long>()); @@ -195,7 +198,7 @@ public class HostRequest implements Comparable<HostRequest> { StartHostTask startTask = null; if (!skipStartTaskCreate) { - startTask = new StartHostTask(skipFailure); + startTask = new StartHostTask(topology, this, skipFailure); topologyTasks.add(startTask); logicalTaskMap.put(startTask, new HashMap<String, Long>()); } else { @@ -249,16 +252,16 @@ public class HostRequest implements Comparable<HostRequest> { } private void createTasksForReplay(TopologyHostRequestEntity entity) { - topologyTasks.add(new PersistHostResourcesTask()); - topologyTasks.add(new RegisterWithConfigGroupTask()); - InstallHostTask installTask = new InstallHostTask(skipFailure); + topologyTasks.add(new PersistHostResourcesTask(topology, this)); + topologyTasks.add(new RegisterWithConfigGroupTask(topology, this)); + InstallHostTask installTask = new InstallHostTask(topology, this, skipFailure); topologyTasks.add(installTask); logicalTaskMap.put(installTask, new HashMap<String, Long>()); boolean skipStartTaskCreate = topology.getProvisionAction().equals(INSTALL_ONLY); if (!skipStartTaskCreate) { - StartHostTask startTask = new StartHostTask(skipFailure); + StartHostTask startTask = new StartHostTask(topology, this, skipFailure); topologyTasks.add(startTask); logicalTaskMap.put(startTask, new HashMap<String, Long>()); } @@ -433,9 +436,8 @@ public class HostRequest implements Comparable<HostRequest> { //todo: once we have logical tasks, move tracking of physical tasks there public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) { physicalTasks.put(logicalTaskId, physicalTaskId); - - topology.getAmbariContext().getPersistedTopologyState(). - registerPhysicalTask(logicalTaskId, physicalTaskId); + topology.getAmbariContext().getPersistedTopologyState().registerPhysicalTask(logicalTaskId, physicalTaskId); + getLogicalTask(logicalTaskId).incrementAttemptCount(); } private Predicate toPredicate(String predicate) { @@ -451,152 +453,6 @@ public class HostRequest implements Comparable<HostRequest> { return compiledPredicate; } - private class PersistHostResourcesTask implements TopologyTask { - private AmbariContext ambariContext; - - @Override - public Type getType() { - return Type.RESOURCE_CREATION; - } - - @Override - public void init(ClusterTopology topology, AmbariContext ambariContext) { - this.ambariContext = ambariContext; - } - - @Override - public void run() { - LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", hostname); - HostGroup group = topology.getBlueprint().getHostGroup(getHostgroupName()); - Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>(); - for (String service : group.getServices()) { - serviceComponents.put(service, new HashSet<String> (group.getComponents(service))); - } - ambariContext.createAmbariHostResources(getClusterId(), getHostName(), serviceComponents); - } - } - - private class RegisterWithConfigGroupTask implements TopologyTask { - private ClusterTopology clusterTopology; - private AmbariContext ambariContext; - - @Override - public Type getType() { - return Type.CONFIGURE; - } - - @Override - public void init(ClusterTopology topology, AmbariContext ambariContext) { - clusterTopology = topology; - this.ambariContext = ambariContext; - } - - @Override - public void run() { - LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostname); - ambariContext.registerHostWithConfigGroup(getHostName(), clusterTopology, getHostgroupName()); - } - } - - //todo: extract - private class InstallHostTask implements TopologyTask { - private ClusterTopology clusterTopology; - private final boolean skipFailure; - - public InstallHostTask(boolean skipFailure) { - this.skipFailure = skipFailure; - } - - @Override - public Type getType() { - return Type.INSTALL; - } - - @Override - public void init(ClusterTopology topology, AmbariContext ambariContext) { - clusterTopology = topology; - } - - @Override - public void run() { - LOG.info("HostRequest: Executing INSTALL task for host: {}", hostname); - boolean skipInstallTaskCreate = topology.getProvisionAction().equals(ProvisionAction.START_ONLY); - RequestStatusResponse response = clusterTopology.installHost(hostname, skipInstallTaskCreate, skipFailure); - // map logical install tasks to physical install tasks - List<ShortTaskStatus> underlyingTasks = response.getTasks(); - for (ShortTaskStatus task : underlyingTasks) { - Long logicalInstallTaskId = logicalTaskMap.get(this).get(task.getRole()); - if(logicalInstallTaskId == null) { - LOG.info("Skipping physical install task registering, because component {} cannot be found", task.getRole()); - continue; - } - //todo: for now only one physical task per component - long taskId = task.getTaskId(); - registerPhysicalTaskId(logicalInstallTaskId, taskId); - - //todo: move this to provision - //todo: shouldn't have to iterate over all tasks to find install task - //todo: we are doing the same thing in the above registerPhysicalTaskId() call - // set attempt count on task - for (HostRoleCommand logicalTask : logicalTasks.values()) { - if (logicalTask.getTaskId() == logicalInstallTaskId) { - logicalTask.incrementAttemptCount(); - } - } - } - - LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostname); - } - } - - //todo: extract - private class StartHostTask implements TopologyTask { - private ClusterTopology clusterTopology; - private final boolean skipFailure; - - public StartHostTask(boolean skipFailure) { - this.skipFailure = skipFailure; - } - - @Override - public Type getType() { - return Type.START; - } - - @Override - public void init(ClusterTopology topology, AmbariContext ambariContext) { - clusterTopology = topology; - } - - @Override - public void run() { - LOG.info("HostRequest: Executing START task for host: {}", hostname); - RequestStatusResponse response = clusterTopology.startHost(hostname, skipFailure); - // map logical install tasks to physical install tasks - List<ShortTaskStatus> underlyingTasks = response.getTasks(); - for (ShortTaskStatus task : underlyingTasks) { - String component = task.getRole(); - Long logicalStartTaskId = logicalTaskMap.get(this).get(component); - if(logicalStartTaskId == null) { - LOG.info("Skipping physical start task registering, because component {} cannot be found", task.getRole()); - continue; - } - // for now just set on outer map - registerPhysicalTaskId(logicalStartTaskId, task.getTaskId()); - - //todo: move this to provision - // set attempt count on task - for (HostRoleCommand logicalTask : logicalTasks.values()) { - if (logicalTask.getTaskId() == logicalStartTaskId) { - logicalTask.incrementAttemptCount(); - } - } - } - - LOG.info("HostRequest: Exiting START task for host: {}", hostname); - } - } - private class HostResourceAdapter implements Resource { Resource hostResource; http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java index be4ab7a..912b2ff 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -48,6 +48,7 @@ import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; import org.apache.ambari.server.orm.entities.TopologyRequestEntity; import org.apache.ambari.server.stack.NoSuchStackException; import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.topology.tasks.TopologyTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index f53f04a..8e991d6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -72,6 +72,8 @@ import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.host.HostImpl; import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTask; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.apache.ambari.server.utils.RetryHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +89,9 @@ import com.google.inject.persist.Transactional; @Singleton public class TopologyManager { + /** internal token for topology related async tasks */ + public static final String INTERNAL_AUTH_TOKEN = "internal_topology_token"; + public static final String INITIAL_CONFIG_TAG = "INITIAL"; public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED"; public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential"; @@ -122,6 +127,9 @@ public class TopologyManager { private SecurityConfigurationFactory securityConfigurationFactory; @Inject + private ConfigureClusterTaskFactory configureClusterTaskFactory; + + @Inject private AmbariEventPublisher ambariEventPublisher; @Inject @@ -302,7 +310,6 @@ public class TopologyManager { } ); - clusterTopologyMap.put(clusterId, topology); addClusterConfigRequest(topology, new ClusterConfigurationRequest( @@ -1030,101 +1037,15 @@ public class TopologyManager { LOG.debug("No timeout constraints found in configuration. Wired defaults will be applied."); } - ConfigureClusterTask configureClusterTask = new ConfigureClusterTask(topology, configurationRequest); + ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology, + configurationRequest); + AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay, Executors.newScheduledThreadPool(1)); executor.submit(asyncCallableService); } - // package protected for testing purposes - static class ConfigureClusterTask implements Callable<Boolean> { - - private ClusterConfigurationRequest configRequest; - private ClusterTopology topology; - - public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) { - this.configRequest = configRequest; - this.topology = topology; - } - - @Override - public Boolean call() throws Exception { - LOG.info("TopologyManager.ConfigureClusterTask: Entering"); - - Collection<String> requiredHostGroups = getTopologyRequiredHostGroups(); - - if (!areRequiredHostGroupsResolved(requiredHostGroups)) { - LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " + - "satisfied"); - throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " + - "request processing not yet satisfied"); - } - - try { - LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " + - "Configuration can now begin"); - configRequest.process(); - } catch (Exception e) { - LOG.error("TopologyManager.ConfigureClusterTask: " + - "An exception occurred while attempting to process cluster configs and set on cluster: ", e); - - // this will signal an unsuccessful run, retry will be triggered if required - throw new Exception(e); - } - - LOG.info("TopologyManager.ConfigureClusterTask: Exiting"); - return true; - } - - /** - * Return the set of host group names which are required for configuration topology resolution. - * - * @return set of required host group names - */ - private Collection<String> getTopologyRequiredHostGroups() { - Collection<String> requiredHostGroups; - try { - requiredHostGroups = configRequest.getRequiredHostGroups(); - } catch (RuntimeException e) { - // just log error and allow config topology update - LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" + - " host groups for config update ", e); - requiredHostGroups = Collections.emptyList(); - } - return requiredHostGroups; - } - - /** - * Determine if all hosts for the given set of required host groups are known. - * - * @param requiredHostGroups set of required host groups - * @return true if all required host groups are resolved - */ - private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) { - boolean configTopologyResolved = true; - Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo(); - for (String hostGroup : requiredHostGroups) { - HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup); - if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) { - configTopologyResolved = false; - if (groupInfo != null) { - LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.", - groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size()); - } else { - LOG.error("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} is required group and does not map to any hosts. Use add host API to add host to this host group.", - hostGroup); - } - break; - } else { - LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.", - groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size()); - } - } - return configTopologyResolved; - } - } - /** * * Removes a host from the available hosts when the host gets deleted. http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java deleted file mode 100644 index ef39896..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyTask.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distribut - * ed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.server.topology; - -/** - * Task which is executed by the TopologyManager. - */ -public interface TopologyTask extends Runnable { - /** - * Task type. - */ - public enum Type { - RESOURCE_CREATION, - CONFIGURE, - INSTALL, - START - } - - /** - * injection of topology and ambari context - */ - public void init(ClusterTopology topology, AmbariContext ambariContext); - - /** - * Get the task type. - * - * @return the type of task - */ - public Type getType(); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java new file mode 100644 index 0000000..19d99ad --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext; +import org.apache.ambari.server.topology.ClusterConfigurationRequest; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostGroupInfo; +import org.apache.ambari.server.topology.TopologyManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; + +public class ConfigureClusterTask implements Callable<Boolean> { + + private static Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class); + + private ClusterConfigurationRequest configRequest; + private ClusterTopology topology; + + @AssistedInject + public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest) { + this.configRequest = configRequest; + this.topology = topology; + } + + @Override + @RunWithInternalSecurityContext(token = TopologyManager.INTERNAL_AUTH_TOKEN) + public Boolean call() throws Exception { + LOG.info("TopologyManager.ConfigureClusterTask: Entering"); + + Collection<String> requiredHostGroups = getTopologyRequiredHostGroups(); + + if (!areRequiredHostGroupsResolved(requiredHostGroups)) { + LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " + + "satisfied"); + throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " + + "request processing not yet satisfied"); + } + + try { + LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " + + "Configuration can now begin"); + configRequest.process(); + } catch (Exception e) { + LOG.error("TopologyManager.ConfigureClusterTask: " + + "An exception occurred while attempting to process cluster configs and set on cluster: ", e); + + // this will signal an unsuccessful run, retry will be triggered if required + throw new Exception(e); + } + + LOG.info("TopologyManager.ConfigureClusterTask: Exiting"); + return true; + } + + /** + * Return the set of host group names which are required for configuration topology resolution. + * + * @return set of required host group names + */ + private Collection<String> getTopologyRequiredHostGroups() { + Collection<String> requiredHostGroups; + try { + requiredHostGroups = configRequest.getRequiredHostGroups(); + } catch (RuntimeException e) { + // just log error and allow config topology update + LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" + + " host groups for config update ", e); + requiredHostGroups = Collections.emptyList(); + } + return requiredHostGroups; + } + + /** + * Determine if all hosts for the given set of required host groups are known. + * + * @param requiredHostGroups set of required host groups + * @return true if all required host groups are resolved + */ + private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) { + boolean configTopologyResolved = true; + Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo(); + for (String hostGroup : requiredHostGroups) { + HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup); + if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) { + configTopologyResolved = false; + if (groupInfo != null) { + LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.", + groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size()); + } + break; + } else { + LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.", + groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size()); + } + } + return configTopologyResolved; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java new file mode 100644 index 0000000..0287103 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.tasks; + +import org.apache.ambari.server.topology.ClusterConfigurationRequest; +import org.apache.ambari.server.topology.ClusterTopology; + + +public interface ConfigureClusterTaskFactory { + + ConfigureClusterTask createConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest + configRequest); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java new file mode 100644 index 0000000..f38022a --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/InstallHostTask.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import java.util.List; + +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.controller.internal.ProvisionAction; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; + +public class InstallHostTask extends TopologyHostTask { + + private final static Logger LOG = LoggerFactory.getLogger(InstallHostTask.class); + + @AssistedInject + public InstallHostTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest, @Assisted boolean skipFailure) { + super(topology, hostRequest); + this.skipFailure = skipFailure; + } + + @Override + public Type getType() { + return Type.INSTALL; + } + + @Override + public void runTask() { + LOG.info("HostRequest: Executing INSTALL task for host: {}", hostRequest.getHostName()); + boolean skipInstallTaskCreate = clusterTopology.getProvisionAction().equals(ProvisionAction.START_ONLY); + RequestStatusResponse response = clusterTopology.installHost(hostRequest.getHostName(), skipInstallTaskCreate, skipFailure); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + + String component = task.getRole(); + Long logicalInstallTaskId = hostRequest.getLogicalTasksForTopologyTask(this).get(component); + if(logicalInstallTaskId == null) { + LOG.info("Skipping physical install task registering, because component {} cannot be found", task.getRole()); + continue; + } + //todo: for now only one physical task per component + long taskId = task.getTaskId(); + hostRequest.registerPhysicalTaskId(logicalInstallTaskId, taskId); + } + + LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostRequest.getHostName()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java new file mode 100644 index 0000000..0730fe8 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/PersistHostResourcesTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostGroup; +import org.apache.ambari.server.topology.HostRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; + +public class PersistHostResourcesTask extends TopologyHostTask { + + private final static Logger LOG = LoggerFactory.getLogger(PersistHostResourcesTask.class); + + @AssistedInject + public PersistHostResourcesTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest) { + super(topology, hostRequest); + } + + @Override + public Type getType() { + return Type.RESOURCE_CREATION; + } + + @Override + public void runTask() { + LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", hostRequest.getHostName()); + HostGroup group = hostRequest.getHostGroup(); + Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>(); + for (String service : group.getServices()) { + serviceComponents.put(service, new HashSet<String>(group.getComponents(service))); + } + clusterTopology.getAmbariContext().createAmbariHostResources(hostRequest.getClusterId(), + hostRequest.getHostName(), serviceComponents); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java new file mode 100644 index 0000000..029f2a4 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/RegisterWithConfigGroupTask.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; + + +public class RegisterWithConfigGroupTask extends TopologyHostTask { + + private final static Logger LOG = LoggerFactory.getLogger(RegisterWithConfigGroupTask.class); + + @AssistedInject + public RegisterWithConfigGroupTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest) { + super(topology, hostRequest); + } + + @Override + public Type getType() { + return Type.CONFIGURE; + } + + @Override + public void runTask() { + LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostRequest.getHostName()); + clusterTopology.getAmbariContext().registerHostWithConfigGroup(hostRequest.getHostName(), clusterTopology, + hostRequest.getHostgroupName()); + } +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java new file mode 100644 index 0000000..054ed1e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/StartHostTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import java.util.List; + +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; + +public class StartHostTask extends TopologyHostTask { + + private final static Logger LOG = LoggerFactory.getLogger(StartHostTask.class); + + @AssistedInject + public StartHostTask(@Assisted ClusterTopology topology, @Assisted HostRequest hostRequest, @Assisted boolean skipFailure) { + super(topology, hostRequest); + this.skipFailure = skipFailure; + } + + @Override + public Type getType() { + return Type.START; + } + + @Override + public void runTask() { + LOG.info("HostRequest: Executing START task for host: {}", hostRequest.getHostName()); + RequestStatusResponse response = clusterTopology.startHost(hostRequest.getHostName(), skipFailure); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + + String component = task.getRole(); + Long logicalStartTaskId = hostRequest.getLogicalTasksForTopologyTask(this).get(component); + if(logicalStartTaskId == null) { + LOG.info("Skipping physical start task registering, because component {} cannot be found", task.getRole()); + continue; + } + // for now just set on outer map + hostRequest.registerPhysicalTaskId(logicalStartTaskId, task.getTaskId()); + } + + LOG.info("HostRequest: Exiting START task for host: {}", hostRequest.getHostName()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java new file mode 100644 index 0000000..82a2f6e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.tasks; + +import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken; +import org.apache.ambari.server.topology.ClusterTopology; +import org.apache.ambari.server.topology.HostRequest; +import org.apache.ambari.server.topology.TopologyManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; + +public abstract class TopologyHostTask implements TopologyTask { + + private static Logger LOG = LoggerFactory.getLogger(TopologyHostTask.class); + + ClusterTopology clusterTopology; + HostRequest hostRequest; + boolean skipFailure; + + public TopologyHostTask(ClusterTopology topology, HostRequest hostRequest) { + this.clusterTopology = topology; + this.hostRequest = hostRequest; + } + + /** + * Run with an InternalAuthenticationToken as when running these tasks we might not have any active security context. + */ + public void run() { + Authentication savedAuthContext = SecurityContextHolder.getContext().getAuthentication(); + try { + InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken(TopologyManager.INTERNAL_AUTH_TOKEN); + authenticationToken.setAuthenticated(true); + SecurityContextHolder.getContext().setAuthentication(authenticationToken); + runTask(); + } finally { + SecurityContextHolder.getContext().setAuthentication(savedAuthContext); + } + } + + public abstract void runTask(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java new file mode 100644 index 0000000..0753c3d --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology.tasks; + +/** + * Task which is executed by the TopologyManager. + */ +public interface TopologyTask extends Runnable { + /** + * Task type. + */ + public enum Type { + RESOURCE_CREATION, + CONFIGURE, + INSTALL, + START + } + + /** + * Get the task type. + * + * @return the type of task + */ + public Type getType(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java index 8ae192b..3f62366 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java @@ -73,6 +73,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionImpl; import org.apache.ambari.server.state.stack.OsFamily; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl; import org.apache.ambari.server.topology.PersistedState; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.codehaus.jettison.json.JSONException; @@ -346,6 +347,8 @@ public class AgentResourceTest extends RandomPortJerseyTest { RequestExecutionImpl.class).build(RequestExecutionFactory.class)); install(new FactoryModuleBuilder().build(StageFactory.class)); install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class)); + install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class)); + bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class); bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance()); http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java index 97d560f..c6cef26 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java @@ -78,7 +78,7 @@ import org.apache.ambari.server.topology.HostRequest; import org.apache.ambari.server.topology.LogicalRequest; import org.apache.ambari.server.topology.PersistedState; import org.apache.ambari.server.topology.TopologyRequest; -import org.apache.ambari.server.topology.TopologyTask; +import org.apache.ambari.server.topology.tasks.TopologyTask; import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java index 748b4e9..af3fc08 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java @@ -66,6 +66,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.easymock.Capture; import org.easymock.EasyMockRule; import org.easymock.EasyMockSupport; @@ -154,6 +155,8 @@ public class ClusterDeployWithStartOnlyTest { private ComponentInfo serviceComponentInfo; @Mock(type = MockType.NICE) private ComponentInfo clientComponentInfo; + @Mock(type = MockType.NICE) + private ConfigureClusterTaskFactory configureClusterTaskFactory; @Mock(type = MockType.STRICT) private Future mockFuture; http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java index a1f3d25..09a6aa2 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java @@ -67,6 +67,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.easymock.Capture; import org.easymock.EasyMockRule; import org.easymock.EasyMockSupport; @@ -144,6 +145,8 @@ public class ClusterInstallWithoutStartOnComponentLevelTest { private Cluster cluster; @Mock(type = MockType.NICE) private HostRoleCommand hostRoleCommand; + @Mock(type = MockType.NICE) + private ConfigureClusterTaskFactory configureClusterTaskFactory; @Mock(type = MockType.NICE) http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java index 33f318a..44cc9f7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java @@ -67,6 +67,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.easymock.Capture; import org.easymock.EasyMockRule; import org.easymock.EasyMockSupport; @@ -144,6 +145,8 @@ public class ClusterInstallWithoutStartTest { private Cluster cluster; @Mock(type = MockType.NICE) private HostRoleCommand hostRoleCommand; + @Mock(type = MockType.NICE) + private ConfigureClusterTaskFactory configureClusterTaskFactory; @Mock(type = MockType.NICE) http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java index aa7ba0e..e9198fb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTask; import org.easymock.EasyMockRule; import org.easymock.Mock; import org.easymock.MockType; @@ -59,14 +60,12 @@ public class ConfigureClusterTaskTest { @Mock(type = MockType.STRICT) private ClusterTopology clusterTopology; - private TopologyManager.ConfigureClusterTask testSubject; - + private ConfigureClusterTask testSubject; @Before public void before() { reset(clusterConfigurationRequest, clusterTopology); - testSubject = new TopologyManager.ConfigureClusterTask(clusterTopology, clusterConfigurationRequest); - + testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 4f087f0..469617c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -65,6 +65,7 @@ import org.apache.ambari.server.security.encryption.CredentialStoreService; import org.apache.ambari.server.stack.NoSuchStackException; import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRule; @@ -151,6 +152,8 @@ public class TopologyManagerTest { private SettingDAO settingDAO; @Mock(type = MockType.NICE) private ClusterTopology clusterTopologyMock; + @Mock(type = MockType.NICE) + private ConfigureClusterTaskFactory configureClusterTaskFactory; @Mock(type = MockType.STRICT) http://git-wip-us.apache.org/repos/asf/ambari/blob/5ab46f0c/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java index 5c77831..cf7ff7f 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/StageUtilsTest.java @@ -80,6 +80,7 @@ import org.apache.ambari.server.state.host.HostFactory; import org.apache.ambari.server.state.stack.OsFamily; import org.apache.ambari.server.topology.PersistedState; import org.apache.ambari.server.topology.TopologyManager; +import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.easymock.EasyMockSupport; @@ -130,6 +131,7 @@ public class StageUtilsTest extends EasyMockSupport { install(new FactoryModuleBuilder().build(ExecutionCommandWrapperFactory.class)); install(new FactoryModuleBuilder().implement(Config.class, ConfigImpl.class).build(ConfigFactory.class)); + install(new FactoryModuleBuilder().build(ConfigureClusterTaskFactory.class)); } });
