http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java index 9f9db5c..9e25dfb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -18,144 +18,122 @@ package org.apache.ambari.server.topology; -import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.Role; -import org.apache.ambari.server.RoleCommand; -import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.actionmanager.HostRoleCommand; -import org.apache.ambari.server.actionmanager.HostRoleCommandFactory; -import org.apache.ambari.server.actionmanager.HostRoleStatus; -import org.apache.ambari.server.api.services.AmbariMetaInfo; -import org.apache.ambari.server.controller.AmbariManagementController; -import org.apache.ambari.server.controller.ConfigGroupRequest; +import org.apache.ambari.server.api.predicate.InvalidQueryException; +import org.apache.ambari.server.api.predicate.PredicateCompiler; import org.apache.ambari.server.controller.RequestStatusResponse; -import org.apache.ambari.server.controller.ServiceComponentHostRequest; import org.apache.ambari.server.controller.ShortTaskStatus; -import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider; -import org.apache.ambari.server.controller.internal.HostComponentResourceProvider; import org.apache.ambari.server.controller.internal.HostResourceProvider; -import org.apache.ambari.server.controller.internal.RequestImpl; import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.internal.Stack; -import org.apache.ambari.server.controller.spi.NoSuchParentResourceException; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.Resource; -import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; -import org.apache.ambari.server.controller.spi.SystemException; -import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; -import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; -import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.Config; -import org.apache.ambari.server.state.ConfigHelper; -import org.apache.ambari.server.state.ConfigImpl; -import org.apache.ambari.server.state.StackId; -import org.apache.ambari.server.state.configgroup.ConfigGroup; +import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; import org.apache.ambari.server.state.host.HostImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; - -import static org.apache.ambari.server.controller.AmbariServer.getController; /** * Represents a set of requests to a single host such as install, start, etc. */ public class HostRequest implements Comparable<HostRequest> { + private final static Logger LOG = LoggerFactory.getLogger(HostRequest.class); + private long requestId; private String blueprint; private HostGroup hostGroup; private String hostgroupName; private Predicate predicate; - private int cardinality = -1; private String hostname = null; private String cluster; private boolean containsMaster; - private long stageId = -1; - //todo: should be able to use the presence of hostName for this - private boolean outstanding = true; + private final long id; + private boolean isOutstanding = true; - //todo: remove - private Map<String, Long> logicalInstallTaskIds = new HashMap<String, Long>(); - //todo: remove - private Map<String, Long> logicalStartTaskIds = new HashMap<String, Long>(); + private Map<TopologyTask, Map<String, Long>> logicalTaskMap = new HashMap<TopologyTask, Map<String, Long>>(); - Collection<HostRoleCommand> logicalTasks = new ArrayList<HostRoleCommand>(); + Map<Long, HostRoleCommand> logicalTasks = new HashMap<Long, HostRoleCommand>(); // logical task id -> physical tasks - private Map<Long, Collection<Long>> physicalTasks = new HashMap<Long, Collection<Long>>(); - - private static HostResourceProvider hostResourceProvider; - - private HostComponentResourceProvider hostComponentResourceProvider; + private Map<Long, Long> physicalTasks = new HashMap<Long, Long>(); - private AmbariManagementController controller = getController(); - private ActionManager actionManager = controller.getActionManager(); - private ConfigHelper configHelper = controller.getConfigHelper(); - private AmbariMetaInfo metaInfoManager = controller.getAmbariMetaInfo(); + private List<TopologyTask> topologyTasks = new ArrayList<TopologyTask>(); - //todo: temporary refactoring step - private TopologyManager.ClusterTopologyContext topologyContext; + private ClusterTopology topology; - private static HostRoleCommandFactory hostRoleCommandFactory; + private static PredicateCompiler predicateCompiler = new PredicateCompiler(); - public static void init(HostRoleCommandFactory factory) { - hostRoleCommandFactory = factory; - } - - public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup, - int cardinality, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) { + public HostRequest(long requestId, long id, String cluster, String hostname, String blueprintName, + HostGroup hostGroup, Predicate predicate, ClusterTopology topology) { this.requestId = requestId; - this.stageId = stageId; + this.id = id; this.cluster = cluster; this.blueprint = blueprintName; this.hostGroup = hostGroup; this.hostgroupName = hostGroup.getName(); - this.cardinality = cardinality; this.predicate = predicate; this.containsMaster = hostGroup.containsMasterComponent(); - this.topologyContext = topologyContext; + this.topology = topology; createTasks(); - System.out.println("HostRequest: Created request: Host Association Pending"); + System.out.println("HostRequest: Created request for host: " + + (hostname == null ? "Host Assignment Pending" : hostname)); } - public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup, - String hostname, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) { + /** + * Only to be used when replaying persisted requests upon server startup. + * + * @param requestId logical request id + * @param id host request id + * @param predicate host predicate + * @param topology cluster topology + * @param entity host request entity + */ + public HostRequest(long requestId, long id, String predicate, + ClusterTopology topology, TopologyHostRequestEntity entity) { + this.requestId = requestId; - this.stageId = stageId; - this.cluster = cluster; - this.blueprint = blueprintName; - this.hostGroup = hostGroup; - this.hostgroupName = hostGroup.getName(); - this.hostname = hostname; - this.predicate = predicate; + this.id = id; + this.cluster = topology.getClusterName(); + this.blueprint = topology.getBlueprint().getName(); + this.hostgroupName = entity.getTopologyHostGroupEntity().getName(); + this.hostGroup = topology.getBlueprint().getHostGroup(hostgroupName); + this.hostname = entity.getHostName(); + this.predicate = toPredicate(predicate); this.containsMaster = hostGroup.containsMasterComponent(); - this.topologyContext = topologyContext; + this.topology = topology; - createTasks(); - System.out.println("HostRequest: Created request for host: " + hostname); + createTasksForReplay(entity); + + //todo: we may be able to simplify by just checking hostname + isOutstanding = hostname == null || !topology.getAmbariContext(). + isHostRegisteredWithCluster(cluster, hostname); + + System.out.println("HostRequest: Successfully recovered host request for host: " + + (hostname == null ? "Host Assignment Pending" : hostname)); } //todo: synchronization public synchronized HostOfferResponse offer(HostImpl host) { - if (! outstanding) { + if (!isOutstanding) { return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); } if (matchesHost(host)) { - outstanding = false; + isOutstanding = false; hostname = host.getHostName(); - List<TopologyTask> tasks = provision(host); - - return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, hostGroup.getName(), tasks); + setHostOnTasks(host); + return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, id, hostGroup.getName(), topologyTasks); } else { return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE); } @@ -184,32 +162,28 @@ public class HostRequest implements Comparable<HostRequest> { return hostgroupName; } - public int getCardinality() { - return cardinality; - } - public Predicate getPredicate() { return predicate; } + public boolean isCompleted() { + return ! isOutstanding; + } - private List<TopologyTask> provision(HostImpl host) { - List<TopologyTask> tasks = new ArrayList<TopologyTask>(); - - tasks.add(new CreateHostResourcesTask(topologyContext.getClusterTopology(), host, getHostgroupName())); - setHostOnTasks(host); - - HostGroup hostGroup = getHostGroup(); - tasks.add(new ConfigureConfigGroup(getConfigurationGroupName(hostGroup.getBlueprintName(), - hostGroup.getName()), getClusterName(), hostname)); + private void createTasks() { + // high level topology tasks such as INSTALL, START, ... + topologyTasks.add(new PersistHostResourcesTask()); + topologyTasks.add(new RegisterWithConfigGroupTask()); - tasks.add(getInstallTask()); - tasks.add(getStartTask()); + InstallHostTask installTask = new InstallHostTask(); + topologyTasks.add(installTask); + StartHostTask startTask = new StartHostTask(); + topologyTasks.add(startTask); - return tasks; - } + logicalTaskMap.put(installTask, new HashMap<String, Long>()); + logicalTaskMap.put(startTask, new HashMap<String, Long>()); - private void createTasks() { + // lower level logical component level tasks which get mapped to physical tasks HostGroup hostGroup = getHostGroup(); for (String component : hostGroup.getComponents()) { if (component == null || component.equals("AMBARI_SERVER")) { @@ -221,79 +195,90 @@ public class HostRequest implements Comparable<HostRequest> { getHostName() : "PENDING HOST ASSIGNMENT : HOSTGROUP=" + getHostgroupName(); - HostRoleCommand installTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.INSTALL); - installTask.setStatus(HostRoleStatus.PENDING); - installTask.setTaskId(topologyContext.getNextTaskId()); - installTask.setRequestId(getRequestId()); - installTask.setStageId(stageId); + AmbariContext context = topology.getAmbariContext(); + HostRoleCommand logicalInstallTask = context.createAmbariTask( + getRequestId(), id, component, hostName, AmbariContext.TaskType.INSTALL); + logicalTasks.put(logicalInstallTask.getTaskId(), logicalInstallTask); + logicalTaskMap.get(installTask).put(component, logicalInstallTask.getTaskId()); - //todo: had to add requestId to ShortTaskStatus - //todo: revert addition of requestId when we are using LogicalTask - installTask.setRequestId(getRequestId()); + Stack stack = hostGroup.getStack(); + // if component isn't a client, add a start task + if (! stack.getComponentInfo(component).isClient()) { + HostRoleCommand logicalStartTask = context.createAmbariTask( + getRequestId(), id, component, hostName, AmbariContext.TaskType.START); + logicalTasks.put(logicalStartTask.getTaskId(), logicalStartTask); + logicalTaskMap.get(startTask).put(component, logicalStartTask.getTaskId()); + } + } + } - logicalTasks.add(installTask); - registerLogicalInstallTaskId(component, installTask.getTaskId()); + private void createTasksForReplay(TopologyHostRequestEntity entity) { + topologyTasks.add(new PersistHostResourcesTask()); + topologyTasks.add(new RegisterWithConfigGroupTask()); + InstallHostTask installTask = new InstallHostTask(); + topologyTasks.add(installTask); + StartHostTask startTask = new StartHostTask(); + topologyTasks.add(startTask); + + logicalTaskMap.put(installTask, new HashMap<String, Long>()); + logicalTaskMap.put(startTask, new HashMap<String, Long>()); + + AmbariContext ambariContext = topology.getAmbariContext(); + // lower level logical component level tasks which get mapped to physical tasks + for (TopologyHostTaskEntity topologyTaskEntity : entity.getTopologyHostTaskEntities()) { + TopologyTask.Type taskType = TopologyTask.Type.valueOf(topologyTaskEntity.getType()); + for (TopologyLogicalTaskEntity logicalTaskEntity : topologyTaskEntity.getTopologyLogicalTaskEntities()) { + Long logicalTaskId = logicalTaskEntity.getId(); + String component = logicalTaskEntity.getComponentName(); + + AmbariContext.TaskType logicalTaskType = getLogicalTaskType(taskType); + HostRoleCommand task = ambariContext.createAmbariTask(logicalTaskId, getRequestId(), id, + component, entity.getHostName(), logicalTaskType); + + logicalTasks.put(logicalTaskId, task); + Long physicalTaskId = logicalTaskEntity.getPhysicalTaskId(); + if (physicalTaskId != null) { + registerPhysicalTaskId(logicalTaskId, physicalTaskId); + } - Stack stack = hostGroup.getStack(); - try { - // if component isn't a client, add a start task - if (! metaInfoManager.getComponent(stack.getName(), stack.getVersion(), stack.getServiceForComponent(component), component).isClient()) { - HostRoleCommand startTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.START); - startTask.setStatus(HostRoleStatus.PENDING); - startTask.setRequestId(getRequestId()); - startTask.setTaskId(topologyContext.getNextTaskId()); - startTask.setRequestId(getRequestId()); - startTask.setStageId(stageId); - logicalTasks.add(startTask); - registerLogicalStartTaskId(component, startTask.getTaskId()); + //assumes only one task per type + for (TopologyTask topologyTask : topologyTasks) { + if (taskType == topologyTask.getType()) { + logicalTaskMap.get(topologyTask).put(component, logicalTaskId); + } } - } catch (AmbariException e) { - e.printStackTrace(); - //todo: how to handle - throw new RuntimeException(e); } } } - /** - * Get a config group name based on a bp and host group. - * - * @param bpName blueprint name - * @param hostGroupName host group name - * @return config group name - */ - protected String getConfigurationGroupName(String bpName, String hostGroupName) { - return String.format("%s:%s", bpName, hostGroupName); + private static AmbariContext.TaskType getLogicalTaskType(TopologyTask.Type topologyTaskType) { + return topologyTaskType == + TopologyTask.Type.INSTALL ? + AmbariContext.TaskType.INSTALL : + AmbariContext.TaskType.START; } private void setHostOnTasks(HostImpl host) { - for (HostRoleCommand task : getTasks()) { + for (HostRoleCommand task : getLogicalTasks()) { task.setHostEntity(host.getHostEntity()); } } - //todo: analyze all all configuration needs for dealing with deprecated properties - /** - * Since global configs are deprecated since 1.7.0, but still supported. - * We should automatically map any globals used, to *-env dictionaries. - * - * @param blueprintConfigurations map of blueprint configurations keyed by type - */ - private void handleGlobalsBackwardsCompability(Stack stack, - Map<String, Map<String, String>> blueprintConfigurations) { - - StackId stackId = new StackId(stack.getName(), stack.getVersion()); - configHelper.moveDeprecatedGlobals(stackId, blueprintConfigurations, getClusterName()); + public List<TopologyTask> getTopologyTasks() { + return topologyTasks; } - public Collection<HostRoleCommand> getTasks() { + public Collection<HostRoleCommand> getLogicalTasks() { // sync logical task state with physical tasks - for (HostRoleCommand logicalTask : logicalTasks) { - Collection<Long> physicalTaskIds = physicalTasks.get(logicalTask.getTaskId()); - if (physicalTaskIds != null) { - //todo: for now only one physical task per logical task - long physicalTaskId = physicalTaskIds.iterator().next(); - HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId); + for (HostRoleCommand logicalTask : logicalTasks.values()) { + // set host on command detail if it is set to null + String commandDetail = logicalTask.getCommandDetail(); + if (commandDetail != null && commandDetail.contains("null")) { + logicalTask.setCommandDetail(commandDetail.replace("null", hostname)); + } + Long physicalTaskId = physicalTasks.get(logicalTask.getTaskId()); + if (physicalTaskId != null) { + HostRoleCommand physicalTask = topology.getAmbariContext().getPhysicalTask(physicalTaskId); if (physicalTask != null) { logicalTask.setStatus(physicalTask.getStatus()); logicalTask.setCommandDetail(physicalTask.getCommandDetail()); @@ -313,12 +298,20 @@ public class HostRequest implements Comparable<HostRequest> { } } } - return logicalTasks; + return logicalTasks.values(); + } + + public Map<String, Long> getLogicalTasksForTopologyTask(TopologyTask topologyTask) { + return new HashMap<String, Long>(logicalTaskMap.get(topologyTask)); + } + + public HostRoleCommand getLogicalTask(long logicalTaskId) { + return logicalTasks.get(logicalTaskId); } public Collection<HostRoleCommandEntity> getTaskEntities() { Collection<HostRoleCommandEntity> taskEntities = new ArrayList<HostRoleCommandEntity>(); - for (HostRoleCommand task : logicalTasks) { + for (HostRoleCommand task : logicalTasks.values()) { HostRoleCommandEntity entity = task.constructNewPersistenceEntity(); // the above method doesn't set all of the fields for some unknown reason entity.setRequestId(task.getRequestId()); @@ -328,11 +321,9 @@ public class HostRequest implements Comparable<HostRequest> { entity.setErrorLog(task.errorLog); // set state from physical task - Collection<Long> physicalTaskIds = physicalTasks.get(task.getTaskId()); - if (physicalTaskIds != null) { - //todo: for now only one physical task per logical task - long physicalTaskId = physicalTaskIds.iterator().next(); - HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId); + Long physicalTaskId = physicalTasks.get(task.getTaskId()); + if (physicalTaskId != null) { + HostRoleCommand physicalTask = topology.getAmbariContext().getPhysicalTask(physicalTaskId); if (physicalTask != null) { entity.setStatus(physicalTask.getStatus()); entity.setCommandDetail(physicalTask.getCommandDetail()); @@ -361,41 +352,26 @@ public class HostRequest implements Comparable<HostRequest> { } public boolean matchesHost(HostImpl host) { - if (hostname != null) { - return host.getHostName().equals(hostname); - } else if (predicate != null) { - return predicate.evaluate(new HostResourceAdapter(host)); - } else { - return true; - } + return (hostname != null) ? + host.getHostName().equals(hostname) : + predicate == null || predicate.evaluate(new HostResourceAdapter(host)); } public String getHostName() { return hostname; } - public long getStageId() { - return stageId; - } - - //todo: remove - private void registerLogicalInstallTaskId(String component, long taskId) { - logicalInstallTaskIds.put(component, taskId); + public long getId() { + return id; } - //todo: remove - private void registerLogicalStartTaskId(String component, long taskId) { - logicalStartTaskIds.put(component, taskId); - } - - //todo: remove - private long getLogicalInstallTaskId(String component) { - return logicalInstallTaskIds.get(component); + public long getStageId() { + // stage id is same as host request id + return getId(); } - //todo: remove - private long getLogicalStartTaskId(String component) { - return logicalStartTaskIds.get(component); + public Long getPhysicalTaskId(long logicalTaskId) { + return physicalTasks.get(logicalTaskId); } //todo: since this is used to determine equality, using hashCode() isn't safe as it can return the same @@ -411,333 +387,144 @@ public class HostRequest implements Comparable<HostRequest> { //todo: once we have logical tasks, move tracking of physical tasks there public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) { - Collection<Long> physicalTasksForId = physicalTasks.get(logicalTaskId); - if (physicalTasksForId == null) { - physicalTasksForId = new HashSet<Long>(); - physicalTasks.put(logicalTaskId, physicalTasksForId); - } - physicalTasksForId.add(physicalTaskId); - } - - //todo: temporary step - public TopologyTask getInstallTask() { - return new InstallHostTask(); - } + physicalTasks.put(logicalTaskId, physicalTaskId); - //todo: temporary step - public TopologyTask getStartTask() { - return new StartHostTask(); + topology.getAmbariContext().getPersistedTopologyState(). + registerPhysicalTask(logicalTaskId, physicalTaskId); } - //todo: temporary refactoring step - public HostGroupInfo createHostGroupInfo(HostGroup group) { - HostGroupInfo info = new HostGroupInfo(group.getName()); - info.setConfiguration(group.getConfiguration()); - - return info; - } - - private synchronized HostResourceProvider getHostResourceProvider() { - if (hostResourceProvider == null) { - hostResourceProvider = (HostResourceProvider) - ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.Host); - - } - return hostResourceProvider; - } - - private synchronized HostComponentResourceProvider getHostComponentResourceProvider() { - if (hostComponentResourceProvider == null) { - hostComponentResourceProvider = (HostComponentResourceProvider) - ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.HostComponent); + private Predicate toPredicate(String predicate) { + Predicate compiledPredicate = null; + try { + if (predicate != null && ! predicate.isEmpty()) { + compiledPredicate = predicateCompiler.compile(predicate); + } + } catch (InvalidQueryException e) { + // log error and proceed without predicate + LOG.error("Unable to compile predicate for host request: " + e, e); } - return hostComponentResourceProvider; + return compiledPredicate; } - //todo: extract - private class InstallHostTask implements TopologyTask { - //todo: use future to obtain returned Response which contains the request id - //todo: error handling - //todo: monitor status of requests + private class PersistHostResourcesTask implements TopologyTask { + private AmbariContext ambariContext; @Override public Type getType() { - return Type.INSTALL; + return Type.RESOURCE_CREATION; + } + + @Override + public void init(ClusterTopology topology, AmbariContext ambariContext) { + this.ambariContext = ambariContext; } @Override public void run() { - try { - System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname); - RequestStatusResponse response = getHostResourceProvider().install(getHostName(), cluster); - // map logical install tasks to physical install tasks - List<ShortTaskStatus> underlyingTasks = response.getTasks(); - for (ShortTaskStatus task : underlyingTasks) { - Long logicalInstallTaskId = getLogicalInstallTaskId(task.getRole()); - //todo: for now only one physical task per component - long taskId = task.getTaskId(); - //physicalTasks.put(logicalInstallTaskId, Collections.singleton(taskId)); - registerPhysicalTaskId(logicalInstallTaskId, taskId); - - //todo: move this to provision - //todo: shouldn't have to iterate over all tasks to find install task - //todo: we are doing the same thing in the above registerPhysicalTaskId() call - // set attempt count on task - for (HostRoleCommand logicalTask : logicalTasks) { - if (logicalTask.getTaskId() == logicalInstallTaskId) { - logicalTask.incrementAttemptCount(); - } - } - } - } catch (ResourceAlreadyExistsException e) { - e.printStackTrace(); - } catch (SystemException e) { - e.printStackTrace(); - } catch (NoSuchParentResourceException e) { - e.printStackTrace(); - } catch (UnsupportedPropertyException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); + HostGroup group = topology.getBlueprint().getHostGroup(getHostgroupName()); + Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>(); + for (String service : group.getServices()) { + serviceComponents.put(service, new HashSet<String> (group.getComponents(service))); } + ambariContext.createAmbariHostResources(getClusterName(), getHostName(), serviceComponents); } } - //todo: extract - private class StartHostTask implements TopologyTask { - //todo: use future to obtain returned Response which contains the request id - //todo: error handling - //todo: monitor status of requests + private class RegisterWithConfigGroupTask implements TopologyTask { + private ClusterTopology clusterTopology; + private AmbariContext ambariContext; @Override public Type getType() { - return Type.START; + return Type.CONFIGURE; + } + + @Override + public void init(ClusterTopology topology, AmbariContext ambariContext) { + this.clusterTopology = topology; + this.ambariContext = ambariContext; } @Override public void run() { - try { - System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname); - RequestStatusResponse response = getHostComponentResourceProvider().start(cluster, hostname); - // map logical install tasks to physical install tasks - List<ShortTaskStatus> underlyingTasks = response.getTasks(); - for (ShortTaskStatus task : underlyingTasks) { - String component = task.getRole(); - Long logicalStartTaskId = getLogicalStartTaskId(component); - // for now just set on outer map - registerPhysicalTaskId(logicalStartTaskId, task.getTaskId()); - - //todo: move this to provision - // set attempt count on task - for (HostRoleCommand logicalTask : logicalTasks) { - if (logicalTask.getTaskId() == logicalStartTaskId) { - logicalTask.incrementAttemptCount(); - } - } - } - } catch (SystemException e) { - e.printStackTrace(); - } catch (UnsupportedPropertyException e) { - e.printStackTrace(); - } catch (NoSuchParentResourceException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - } + ambariContext.registerHostWithConfigGroup(getHostName(), clusterTopology, getHostgroupName()); } } - private class CreateHostResourcesTask implements TopologyTask { - private ClusterTopology topology; - private HostImpl host; - private String groupName; - - public CreateHostResourcesTask(ClusterTopology topology, HostImpl host, String groupName) { - this.topology = topology; - this.host = host; - this.groupName = groupName; - } + //todo: extract + private class InstallHostTask implements TopologyTask { + private ClusterTopology clusterTopology; @Override public Type getType() { - return Type.RESOURCE_CREATION; + return Type.INSTALL; } @Override - public void run() { - try { - createHostResources(); - } catch (AmbariException e) { - //todo: report error to caller - e.printStackTrace(); - System.out.println("An error occurred when creating host resources: " + e.toString()); - } + public void init(ClusterTopology topology, AmbariContext ambariContext) { + this.clusterTopology = topology; } - private void createHostResources() throws AmbariException { - Map<String, Object> properties = new HashMap<String, Object>(); - properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, getClusterName()); - properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, host.getHostName()); - properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo()); - - getHostResourceProvider().createHosts(new RequestImpl(null, Collections.singleton(properties), null, null)); - createHostComponentResources(); - } - - private void createHostComponentResources() throws AmbariException { - Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>(); - Stack stack = topology.getBlueprint().getStack(); - for (String component : topology.getBlueprint().getHostGroup(groupName).getComponents()) { - //todo: handle this in a generic manner. These checks are all over the code - if (! component.equals("AMBARI_SERVER")) { - requests.add(new ServiceComponentHostRequest(topology.getClusterName(), - stack.getServiceForComponent(component), component, host.getHostName(), null)); + @Override + public void run() { + System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname); + RequestStatusResponse response = clusterTopology.installHost(hostname); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + Long logicalInstallTaskId = logicalTaskMap.get(this).get(task.getRole()); + //todo: for now only one physical task per component + long taskId = task.getTaskId(); + registerPhysicalTaskId(logicalInstallTaskId, taskId); + + //todo: move this to provision + //todo: shouldn't have to iterate over all tasks to find install task + //todo: we are doing the same thing in the above registerPhysicalTaskId() call + // set attempt count on task + for (HostRoleCommand logicalTask : logicalTasks.values()) { + if (logicalTask.getTaskId() == logicalInstallTaskId) { + logicalTask.incrementAttemptCount(); + } } } - - controller.createHostComponents(requests); } } //todo: extract - private class ConfigureConfigGroup implements TopologyTask { - private String groupName; - private String clusterName; - private String hostName; - - public ConfigureConfigGroup(String groupName, String clusterName, String hostName) { - this.groupName = groupName; - this.clusterName = clusterName; - this.hostName = hostName; - } + private class StartHostTask implements TopologyTask { + private ClusterTopology clusterTopology; @Override public Type getType() { - return Type.CONFIGURE; + return Type.START; } @Override - public void run() { - try { - //todo: add task to offer response - if (! addHostToExistingConfigGroups()) { - createConfigGroupsAndRegisterHost(); - } - } catch (Exception e) { - //todo: handle exceptions - e.printStackTrace(); - throw new RuntimeException("Unable to register config group for host: " + hostname); - } + public void init(ClusterTopology topology, AmbariContext ambariContext) { + this.clusterTopology = topology; } - /** - * Add the new host to an existing config group. - * - * @throws SystemException an unknown exception occurred - * @throws UnsupportedPropertyException an unsupported property was specified in the request - * @throws NoSuchParentResourceException a parent resource doesn't exist - */ - private boolean addHostToExistingConfigGroups() - throws SystemException, - UnsupportedPropertyException, - NoSuchParentResourceException { - - boolean addedHost = false; - - Clusters clusters; - Cluster cluster; - try { - clusters = controller.getClusters(); - cluster = clusters.getCluster(clusterName); - } catch (AmbariException e) { - throw new IllegalArgumentException( - String.format("Attempt to add hosts to a non-existent cluster: '%s'", clusterName)); - } - // I don't know of a method to get config group by name - //todo: add a method to get config group by name - Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups(); - for (ConfigGroup group : configGroups.values()) { - if (group.getName().equals(groupName)) { - try { - group.addHost(clusters.getHost(hostName)); - group.persist(); - addedHost = true; - } catch (AmbariException e) { - // shouldn't occur, this host was just added to the cluster - throw new SystemException(String.format( - "Unable to obtain newly created host '%s' from cluster '%s'", hostName, clusterName)); + @Override + public void run() { + System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname); + RequestStatusResponse response = clusterTopology.startHost(hostname); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + String component = task.getRole(); + Long logicalStartTaskId = logicalTaskMap.get(this).get(component); + // for now just set on outer map + registerPhysicalTaskId(logicalStartTaskId, task.getTaskId()); + + //todo: move this to provision + // set attempt count on task + for (HostRoleCommand logicalTask : logicalTasks.values()) { + if (logicalTask.getTaskId() == logicalStartTaskId) { + logicalTask.incrementAttemptCount(); } } } - return addedHost; } - - /** - * Register config groups for host group scoped configuration. - * For each host group with configuration specified in the blueprint, a config group is created - * and the hosts associated with the host group are assigned to the config group. - * - * @throws ResourceAlreadyExistsException attempt to create a config group that already exists - * @throws SystemException an unexpected exception occurs - * @throws UnsupportedPropertyException an invalid property is provided when creating a config group - * @throws NoSuchParentResourceException attempt to create a config group for a non-existing cluster - */ - private void createConfigGroupsAndRegisterHost() throws - ResourceAlreadyExistsException, SystemException, - UnsupportedPropertyException, NoSuchParentResourceException { - - //HostGroupEntity entity = hostGroup.getEntity(); - HostGroup hostGroup = getHostGroup(); - Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>(); - - Stack stack = hostGroup.getStack(); - - // get the host-group config with cluster creation template overrides - Configuration topologyHostGroupConfig = topologyContext.getClusterTopology(). - getHostGroupInfo().get(hostGroup.getName()).getConfiguration(); - - //handling backwards compatibility for group configs - //todo: doesn't belong here - handleGlobalsBackwardsCompability(stack, topologyHostGroupConfig.getProperties()); - - // iterate over topo host group configs which were defined in CCT/HG and BP/HG only, no parent configs - for (Map.Entry<String, Map<String, String>> entry: topologyHostGroupConfig.getProperties().entrySet()) { - String type = entry.getKey(); - String service = stack.getServiceForConfigType(type); - Config config = new ConfigImpl(type); - config.setTag(hostGroup.getName()); - config.setProperties(entry.getValue()); - //todo: attributes - Map<String, Config> serviceConfigs = groupConfigs.get(service); - if (serviceConfigs == null) { - serviceConfigs = new HashMap<String, Config>(); - groupConfigs.put(service, serviceConfigs); - } - serviceConfigs.put(type, config); - } - - String bpName = topologyContext.getClusterTopology().getBlueprint().getName(); - for (Map.Entry<String, Map<String, Config>> entry : groupConfigs.entrySet()) { - String service = entry.getKey(); - Map<String, Config> serviceConfigs = entry.getValue(); - String absoluteGroupName = getConfigurationGroupName(bpName, hostGroup.getName()); - Collection<String> groupHosts; - - groupHosts = topologyContext.getClusterTopology().getHostGroupInfo(). - get(hostgroupName).getHostNames(); - - ConfigGroupRequest request = new ConfigGroupRequest( - null, getClusterName(), absoluteGroupName, service, "Host Group Configuration", - new HashSet<String>(groupHosts), serviceConfigs); - - // get the config group provider and create config group resource - ConfigGroupResourceProvider configGroupProvider = (ConfigGroupResourceProvider) - ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.ConfigGroup); - configGroupProvider.createResources(Collections.singleton(request)); - } - } - - } private class HostResourceAdapter implements Resource {
http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index 5273ff8..087ad4c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -22,12 +22,16 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.actionmanager.Request; -import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity; +import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; import org.apache.ambari.server.state.host.HostImpl; import java.util.ArrayList; @@ -38,30 +42,49 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeSet; - -import static org.apache.ambari.server.controller.AmbariServer.getController; +import java.util.concurrent.atomic.AtomicLong; /** * Logical Request implementation. */ public class LogicalRequest extends Request { - private Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>(); + private final Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>(); // sorted set with master host requests given priority - private Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>(); - private Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>(); + private final Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>(); + private final Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>(); private final ClusterTopology topology; + private static AmbariManagementController controller; + + private static final AtomicLong hostIdCounter = new AtomicLong(1); + + + public LogicalRequest(Long id, TopologyRequest request, ClusterTopology topology) + throws AmbariException { + + //todo: abstract usage of controller, etc ... + super(id, getController().getClusters().getCluster( + request.getClusterName()).getClusterId(), getController().getClusters()); + + setRequestContext(String.format("Logical Request: %s", request.getCommandDescription())); + + this.topology = topology; + createHostRequests(request, topology); + } + + public LogicalRequest(Long id, TopologyRequest request, ClusterTopology topology, + TopologyLogicalRequestEntity requestEntity) throws AmbariException { - //todo: topologyContext is a temporary refactoring step - public LogicalRequest(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) throws AmbariException { //todo: abstract usage of controller, etc ... - super(getController().getActionManager().getNextRequestId(), getController().getClusters().getCluster( - requestRequest.getClusterName()).getClusterId(), getController().getClusters()); + super(id, getController().getClusters().getCluster( + request.getClusterName()).getClusterId(), getController().getClusters()); - this.topology = topologyContext.getClusterTopology(); - createHostRequests(requestRequest, topologyContext); + setRequestContext(String.format("Logical Request: %s", request.getCommandDescription())); + + this.topology = topology; + createHostRequests(topology, requestEntity); } public HostOfferResponse offer(HostImpl host) { @@ -71,7 +94,7 @@ public class LogicalRequest extends Request { if (hostRequest != null) { HostOfferResponse response = hostRequest.offer(host); if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) { - //todo: error handling. This is really a system exception and shouldn't happen + // host request rejected host that it explicitly requested throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + host.getHostName()); } @@ -100,23 +123,16 @@ public class LogicalRequest extends Request { } // if at least one outstanding host request rejected for predicate or we have an outstanding request // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved - //todo: could also check if outstandingHostRequests is empty return predicateRejected || ! requestsWithReservedHosts.isEmpty() ? new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) : new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); } - //todo - @Override - public Collection<Stage> getStages() { - return super.getStages(); - } - @Override public List<HostRoleCommand> getCommands() { List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>(); for (HostRequest hostRequest : allHostRequests) { - commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getTasks())); + commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getLogicalTasks())); } return commands; } @@ -125,6 +141,23 @@ public class LogicalRequest extends Request { return requestsWithReservedHosts.keySet(); } + public boolean hasCompleted() { + return requestsWithReservedHosts.isEmpty() && outstandingHostRequests.isEmpty(); + } + + public Collection<HostRequest> getCompletedHostRequests() { + Collection<HostRequest> completedHostRequests = new ArrayList<HostRequest>(allHostRequests); + completedHostRequests.removeAll(outstandingHostRequests); + completedHostRequests.removeAll(requestsWithReservedHosts.values()); + + return completedHostRequests; + } + + //todo: this is only here for toEntity() functionality + public Collection<HostRequest> getHostRequests() { + return new ArrayList<HostRequest>(allHostRequests); + } + //todo: account for blueprint name? //todo: this should probably be done implicitly at a lower level public boolean areGroupsResolved(Collection<String> hostGroupNames) { @@ -157,9 +190,7 @@ public class LogicalRequest extends Request { return hostComponentMap; } - //todo: currently we are just returning all stages for all requests - //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each - //todo: needed to change the name to avoid a name collision. + // currently we are just returning all stages for all requests public Collection<StageEntity> getStageEntities() { Collection<StageEntity> stages = new ArrayList<StageEntity>(); for (HostRequest hostRequest : allHostRequests) { @@ -182,8 +213,6 @@ public class LogicalRequest extends Request { public RequestStatusResponse getRequestStatus() { RequestStatusResponse requestStatus = new RequestStatusResponse(getRequestId()); requestStatus.setRequestContext(getRequestContext()); - //todo: other request status fields - //todo: ordering of tasks? // convert HostRoleCommands to ShortTaskStatus List<ShortTaskStatus> shortTasks = new ArrayList<ShortTaskStatus>(); @@ -191,7 +220,6 @@ public class LogicalRequest extends Request { shortTasks.add(new ShortTaskStatus(task)); } requestStatus.setTasks(shortTasks); - //todo: null tasks? return requestStatus; } @@ -249,13 +277,10 @@ public class LogicalRequest extends Request { timedout += 1; break; default: - //todo: proper log msg System.out.println("Unexpected status when creating stage summaries: " + taskStatus); } } - //todo: skippable. I only see a skippable field on the stage, not the tasks - //todo: time related fields HostRoleCommandStatusSummaryDTO stageSummary = new HostRoleCommandStatusSummaryDTO(stage.isSkippable() ? 1 : 0, 0, 0, stage.getStageId(), aborted, completed, failed, holding, holdingFailed, holdingTimedout, inProgress, pending, queued, timedout); summaryMap.put(stage.getStageId(), stageSummary); @@ -263,45 +288,69 @@ public class LogicalRequest extends Request { return summaryMap; } - //todo: context is a temporary refactoring step - private void createHostRequests(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) { - //todo: consistent stage ordering - //todo: confirm that stages don't need to be unique across requests - long stageIdCounter = 0; - Map<String, HostGroupInfo> hostGroupInfoMap = requestRequest.getHostGroupInfo(); + private void createHostRequests(TopologyRequest request, ClusterTopology topology) { + Map<String, HostGroupInfo> hostGroupInfoMap = request.getHostGroupInfo(); + Blueprint blueprint = topology.getBlueprint(); for (HostGroupInfo hostGroupInfo : hostGroupInfoMap.values()) { String groupName = hostGroupInfo.getHostGroupName(); - Blueprint blueprint = topology.getBlueprint(); - int hostCardinality; - List<String> hostnames; - - hostCardinality = hostGroupInfo.getRequestedHostCount(); - hostnames = new ArrayList<String>(hostGroupInfo.getHostNames()); - + int hostCardinality = hostGroupInfo.getRequestedHostCount(); + List<String> hostnames = new ArrayList<String>(hostGroupInfo.getHostNames()); for (int i = 0; i < hostCardinality; ++i) { if (! hostnames.isEmpty()) { // host names are specified String hostname = hostnames.get(i); - //todo: pass in HostGroupInfo - HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(), - blueprint.getName(), blueprint.getHostGroup(groupName), hostname, hostGroupInfo.getPredicate(), - topologyContext); + HostRequest hostRequest = new HostRequest(getRequestId(), hostIdCounter.getAndIncrement(), getClusterName(), + hostname, blueprint.getName(), blueprint.getHostGroup(groupName), null, topology); synchronized (requestsWithReservedHosts) { requestsWithReservedHosts.put(hostname, hostRequest); } } else { // host count is specified - //todo: pass in HostGroupInfo - HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(), - blueprint.getName(), blueprint.getHostGroup(groupName), hostCardinality, hostGroupInfo.getPredicate(), - topologyContext); + HostRequest hostRequest = new HostRequest(getRequestId(), hostIdCounter.getAndIncrement(), getClusterName(), + null, blueprint.getName(), blueprint.getHostGroup(groupName), hostGroupInfo.getPredicate(), topology); outstandingHostRequests.add(hostRequest); } } } - allHostRequests.addAll(outstandingHostRequests); allHostRequests.addAll(requestsWithReservedHosts.values()); } + + private void createHostRequests(ClusterTopology topology, + TopologyLogicalRequestEntity requestEntity) { + + for (TopologyHostRequestEntity hostRequestEntity : requestEntity.getTopologyHostRequestEntities()) { + Long hostRequestId = hostRequestEntity.getId(); + synchronized (hostIdCounter) { + if (hostIdCounter.get() <= hostRequestId) { + hostIdCounter.set(hostRequestId + 1); + } + } + TopologyHostGroupEntity hostGroupEntity = hostRequestEntity.getTopologyHostGroupEntity(); + + String reservedHostName = hostGroupEntity. + getTopologyHostInfoEntities().iterator().next().getFqdn(); + + //todo: move predicate processing to host request + HostRequest hostRequest = new HostRequest(getRequestId(), hostRequestId, + reservedHostName, topology, hostRequestEntity); + + allHostRequests.add(hostRequest); + if (! hostRequest.isCompleted()) { + if (reservedHostName != null) { + requestsWithReservedHosts.put(reservedHostName, hostRequest); + } else { + outstandingHostRequests.add(hostRequest); + } + } + } + } + + private synchronized static AmbariManagementController getController() { + if (controller == null) { + controller = AmbariServer.getController(); + } + return controller; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java new file mode 100644 index 0000000..a8a76b9 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; + +/** + * Factory for creating logical requests + */ +//todo: throw more meaningful exception +public class LogicalRequestFactory { + public LogicalRequest createRequest(Long id, TopologyRequest topologyRequest, ClusterTopology topology) + throws AmbariException { + + return new LogicalRequest(id, topologyRequest, topology); + } + + public LogicalRequest createRequest(Long id, TopologyRequest topologyRequest, ClusterTopology topology, + TopologyLogicalRequestEntity requestEntity) throws AmbariException { + + return new LogicalRequest(id, topologyRequest, topology, requestEntity); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java new file mode 100644 index 0000000..dbf6735 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import java.util.List; +import java.util.Map; + +/** + * Persistence abstraction. + */ +public interface PersistedState { + /** + * Persist a topology request. + * + * @param topologyRequest topologyh request to persist + * + * @return a persisted topology request which is a wrapper around a TopologyRequest which + * adds an id that can be used to refer to the persisted entity + */ + PersistedTopologyRequest persistTopologyRequest(TopologyRequest topologyRequest); + + /** + * Persist a logical request. + * + * @param logicalRequest logical request to persist + * @param topologyRequestId the id of the associated topology request + */ + void persistLogicalRequest(LogicalRequest logicalRequest, long topologyRequestId); + + /** + * Register a physical task with a logical task. + * + * @param logicalTaskId logical task id + * @param physicalTaskId physical task id + */ + void registerPhysicalTask(long logicalTaskId, long physicalTaskId); + + /** + * Registeer a host with a host request. + * + * @param hostRequestId host request id + * @param hostName name of host being registered + */ + void registerHostName(long hostRequestId, String hostName); + + /** + * Get all persisted requests. This is used to replay all + * requests upon ambari startup. + * + * @return map of cluster topology to list of logical requests + */ + Map<ClusterTopology, List<LogicalRequest>> getAllRequests(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java new file mode 100644 index 0000000..4101d67 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.StaticallyInject; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.api.predicate.InvalidQueryException; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; +import org.apache.ambari.server.orm.dao.TopologyHostGroupDAO; +import org.apache.ambari.server.orm.dao.TopologyHostRequestDAO; +import org.apache.ambari.server.orm.dao.TopologyLogicalTaskDAO; +import org.apache.ambari.server.orm.dao.TopologyRequestDAO; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity; +import org.apache.ambari.server.orm.entities.TopologyHostInfoEntity; +import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; +import org.apache.ambari.server.orm.entities.TopologyRequestEntity; +import org.apache.ambari.server.stack.NoSuchStackException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation which uses Ambari Database DAO and Entity objects for persistence + * of topology related information. + */ +@StaticallyInject +public class PersistedStateImpl implements PersistedState { + + protected final static Logger LOG = LoggerFactory.getLogger(PersistedState.class); + + @Inject + private static TopologyRequestDAO topologyRequestDAO; + + @Inject + private static TopologyHostGroupDAO hostGroupDAO; + + @Inject + private static TopologyHostRequestDAO hostRequestDAO; + + @Inject + private static TopologyLogicalTaskDAO topologyLogicalTaskDAO; + + @Inject + private static HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private static HostRoleCommandDAO physicalTaskDAO; + + @Inject + private static BlueprintFactory blueprintFactory; + + @Inject + private static LogicalRequestFactory logicalRequestFactory; + + @Inject + private static AmbariContext ambariContext; + + private static Gson jsonSerializer = new Gson(); + + + @Override + public PersistedTopologyRequest persistTopologyRequest(TopologyRequest request) { + TopologyRequestEntity requestEntity = toEntity(request); + topologyRequestDAO.create(requestEntity); + return new PersistedTopologyRequest(requestEntity.getId(), request); + } + + @Override + public void persistLogicalRequest(LogicalRequest logicalRequest, long topologyRequestId) { + TopologyRequestEntity topologyRequestEntity = topologyRequestDAO.findById(topologyRequestId); + TopologyLogicalRequestEntity entity = toEntity(logicalRequest, topologyRequestEntity); + topologyRequestEntity.setTopologyLogicalRequestEntity(entity); + //todo: how to handle missing topology request entity? + + //logicalRequestDAO.create(entity); + + topologyRequestDAO.merge(topologyRequestEntity); + } + + @Override + public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) { + TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId); + HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId); + entity.setHostRoleCommandEntity(physicalEntity); + + topologyLogicalTaskDAO.merge(entity); + } + + @Override + public void registerHostName(long hostRequestId, String hostName) { + TopologyHostRequestEntity entity = hostRequestDAO.findById(hostRequestId); + if (entity.getHostName() == null) { + entity.setHostName(hostName); + hostRequestDAO.merge(entity); + } + } + + @Override + public Map<ClusterTopology, List<LogicalRequest>> getAllRequests() { + //todo: we only currently support a single request per ambari instance so there should only + //todo: be a single cluster topology + Map<ClusterTopology, List<LogicalRequest>> allRequests = new HashMap<ClusterTopology, List<LogicalRequest>>(); + Collection<TopologyRequestEntity> entities = topologyRequestDAO.findAll(); + + Map<String, ClusterTopology> topologyRequests = new HashMap<String, ClusterTopology>(); + for (TopologyRequestEntity entity : entities) { + TopologyRequest replayedRequest = new ReplayedTopologyRequest(entity); + ClusterTopology clusterTopology = topologyRequests.get(replayedRequest.getClusterName()); + if (clusterTopology == null) { + try { + clusterTopology = new ClusterTopologyImpl(ambariContext, replayedRequest); + topologyRequests.put(replayedRequest.getClusterName(), clusterTopology); + allRequests.put(clusterTopology, new ArrayList<LogicalRequest>()); + } catch (InvalidTopologyException e) { + throw new RuntimeException("Failed to construct cluster topology while replaying request: " + e, e); + } + } else { + // ensure all host groups are provided in the combined cluster topology + for (Map.Entry<String, HostGroupInfo> groupInfoEntry : replayedRequest.getHostGroupInfo().entrySet()) { + String name = groupInfoEntry.getKey(); + if (! clusterTopology.getHostGroupInfo().containsKey(name)) { + clusterTopology.getHostGroupInfo().put(name, groupInfoEntry.getValue()); + } + } + } + + TopologyLogicalRequestEntity logicalRequestEntity = entity.getTopologyLogicalRequestEntity(); + Long logicalId = logicalRequestEntity.getId(); + + try { + //todo: fix initialization of ActionManager.requestCounter to account for logical requests + //todo: until this is fixed, increment the counter for every recovered logical request + //todo: this will cause gaps in the request id's after recovery + ambariContext.getNextRequestId(); + allRequests.get(clusterTopology).add(logicalRequestFactory.createRequest( + logicalId, replayedRequest, clusterTopology, logicalRequestEntity)); + } catch (AmbariException e) { + throw new RuntimeException("Failed to construct logical request during replay: " + e, e); + } + } + + return allRequests; + } + + private TopologyRequestEntity toEntity(TopologyRequest request) { + TopologyRequestEntity entity = new TopologyRequestEntity(); + + //todo: this isn't set for a scaling operation because we had intended to allow multiple + //todo: bp's to be used to scale a cluster although this isn't currently supported by + //todo: new topology infrastructure + entity.setAction(request.getType().name()); + if (request.getBlueprint() != null) { + entity.setBlueprintName(request.getBlueprint().getName()); + } + + entity.setClusterAttributes(attributesAsString(request.getConfiguration().getAttributes())); + entity.setClusterName(request.getClusterName()); + entity.setClusterProperties(propertiesAsString(request.getConfiguration().getProperties())); + entity.setDescription(request.getCommandDescription()); + + // host groups + Collection<TopologyHostGroupEntity> hostGroupEntities = new ArrayList<TopologyHostGroupEntity>(); + for (HostGroupInfo groupInfo : request.getHostGroupInfo().values()) { + hostGroupEntities.add(toEntity(groupInfo, entity)); + } + entity.setTopologyHostGroupEntities(hostGroupEntities); + + return entity; + } + + private TopologyLogicalRequestEntity toEntity(LogicalRequest request, TopologyRequestEntity topologyRequestEntity) { + TopologyLogicalRequestEntity entity = new TopologyLogicalRequestEntity(); + + entity.setDescription(request.getRequestContext()); + entity.setId(request.getRequestId()); + entity.setTopologyRequestEntity(topologyRequestEntity); + entity.setTopologyRequestId(topologyRequestEntity.getId()); + + // host requests + Collection<TopologyHostRequestEntity> hostRequests = new ArrayList<TopologyHostRequestEntity>(); + entity.setTopologyHostRequestEntities(hostRequests); + for (HostRequest hostRequest : request.getHostRequests()) { + hostRequests.add(toEntity(hostRequest, entity)); + } + return entity; + } + + private TopologyHostRequestEntity toEntity(HostRequest request, TopologyLogicalRequestEntity logicalRequestEntity) { + TopologyHostRequestEntity entity = new TopologyHostRequestEntity(); + entity.setHostName(request.getHostName()); + entity.setId(request.getId()); + entity.setStageId(request.getStageId()); + + entity.setTopologyLogicalRequestEntity(logicalRequestEntity); + entity.setTopologyHostGroupEntity(hostGroupDAO.findByRequestIdAndName( + logicalRequestEntity.getTopologyRequestId(), request.getHostgroupName())); + + // logical tasks + Collection<TopologyHostTaskEntity> hostRequestTaskEntities = new ArrayList<TopologyHostTaskEntity>(); + entity.setTopologyHostTaskEntities(hostRequestTaskEntities); + // for now only worry about install and start tasks + for (TopologyTask task : request.getTopologyTasks()) { + if (task.getType() == TopologyTask.Type.INSTALL || task.getType() == TopologyTask.Type.START) { + TopologyHostTaskEntity topologyTaskEntity = new TopologyHostTaskEntity(); + hostRequestTaskEntities.add(topologyTaskEntity); + topologyTaskEntity.setType(task.getType().name()); + topologyTaskEntity.setTopologyHostRequestEntity(entity); + Collection<TopologyLogicalTaskEntity> logicalTaskEntities = new ArrayList<TopologyLogicalTaskEntity>(); + topologyTaskEntity.setTopologyLogicalTaskEntities(logicalTaskEntities); + for (Long logicalTaskId : request.getLogicalTasksForTopologyTask(task).values()) { + TopologyLogicalTaskEntity logicalTaskEntity = new TopologyLogicalTaskEntity(); + logicalTaskEntities.add(logicalTaskEntity); + HostRoleCommand logicalTask = request.getLogicalTask(logicalTaskId); + logicalTaskEntity.setId(logicalTaskId); + logicalTaskEntity.setComponentName(logicalTask.getRole().name()); + logicalTaskEntity.setTopologyHostTaskEntity(topologyTaskEntity); + Long physicalId = request.getPhysicalTaskId(logicalTaskId); + if (physicalId != null) { + logicalTaskEntity.setHostRoleCommandEntity(physicalTaskDAO.findByPK(physicalId)); + } + logicalTaskEntity.setTopologyHostTaskEntity(topologyTaskEntity); + } + } + } + return entity; + } + + private TopologyHostGroupEntity toEntity(HostGroupInfo groupInfo, TopologyRequestEntity topologyRequestEntity) { + TopologyHostGroupEntity entity = new TopologyHostGroupEntity(); + entity.setGroupAttributes(attributesAsString(groupInfo.getConfiguration().getAttributes())); + entity.setGroupProperties(propertiesAsString(groupInfo.getConfiguration().getProperties())); + entity.setName(groupInfo.getHostGroupName()); + entity.setTopologyRequestEntity(topologyRequestEntity); + + // host info + Collection<TopologyHostInfoEntity> hostInfoEntities = new ArrayList<TopologyHostInfoEntity>(); + entity.setTopologyHostInfoEntities(hostInfoEntities); + + Collection<String> hosts = groupInfo.getHostNames(); + if (hosts.isEmpty()) { + TopologyHostInfoEntity hostInfoEntity = new TopologyHostInfoEntity(); + hostInfoEntity.setTopologyHostGroupEntity(entity); + hostInfoEntity.setHostCount(groupInfo.getRequestedHostCount()); + if (groupInfo.getPredicate() != null) { + hostInfoEntity.setPredicate(groupInfo.getPredicateString()); + } + hostInfoEntities.add(hostInfoEntity); + } else { + for (String hostName : hosts) { + TopologyHostInfoEntity hostInfoEntity = new TopologyHostInfoEntity(); + hostInfoEntity.setTopologyHostGroupEntity(entity); + if (groupInfo.getPredicate() != null) { + hostInfoEntity.setPredicate(groupInfo.getPredicateString()); + } + hostInfoEntity.setFqdn(hostName); + hostInfoEntity.setHostCount(0); + hostInfoEntities.add(hostInfoEntity); + } + } + return entity; + } + + + private static String propertiesAsString(Map<String, Map<String, String>> configurationProperties) { + return jsonSerializer.toJson(configurationProperties); + } + + private static String attributesAsString(Map<String, Map<String, Map<String, String>>> configurationAttributes) { + return jsonSerializer.toJson(configurationAttributes); + } + + private static class ReplayedTopologyRequest implements TopologyRequest { + private final String clusterName; + private final Type type; + private final String description; + private final Blueprint blueprint; + private final Configuration configuration; + private final Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<String, HostGroupInfo>(); + + public ReplayedTopologyRequest(TopologyRequestEntity entity) { + clusterName = entity.getClusterName(); + type = Type.valueOf(entity.getAction()); + description = entity.getDescription(); + + try { + blueprint = blueprintFactory.getBlueprint(entity.getBlueprintName()); + } catch (NoSuchStackException e) { + throw new RuntimeException("Unable to load blueprint while replaying topology request: " + e, e); + } + configuration = createConfiguration(entity.getClusterProperties(), entity.getClusterAttributes()); + configuration.setParentConfiguration(blueprint.getConfiguration()); + + parseHostGroupInfo(entity); + } + + @Override + public String getClusterName() { + return clusterName; + } + + @Override + public Type getType() { + return type; + } + + @Override + public Blueprint getBlueprint() { + return blueprint; + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public Map<String, HostGroupInfo> getHostGroupInfo() { + return hostGroupInfoMap; + } + + @Override + public List<TopologyValidator> getTopologyValidators() { + return Collections.emptyList(); + } + + @Override + public String getCommandDescription() { + return description; + } + + private Configuration createConfiguration(String propString, String attributeString) { + Map<String, Map<String, String>> properties = jsonSerializer. + <Map<String, Map<String, String>>>fromJson(propString, Map.class); + + Map<String, Map<String, Map<String, String>>> attributes = jsonSerializer. + <Map<String, Map<String, Map<String, String>>>>fromJson(attributeString, Map.class); + + //todo: config parent + return new Configuration(properties, attributes); + } + + private void parseHostGroupInfo(TopologyRequestEntity entity) { + for (TopologyHostGroupEntity hostGroupEntity : entity.getTopologyHostGroupEntities()) { + for (TopologyHostInfoEntity hostInfoEntity : hostGroupEntity.getTopologyHostInfoEntities()) { + String groupName = hostGroupEntity.getName(); + HostGroupInfo groupInfo = hostGroupInfoMap.get(groupName); + if (groupInfo == null) { + groupInfo = new HostGroupInfo(groupName); + hostGroupInfoMap.put(groupName, groupInfo); + } + + // if host names are specified, there will be one group info entity per name + // otherwise there is a single entity with requested count and predicate + String hostname = hostInfoEntity.getFqdn(); + if (hostname != null && ! hostname.isEmpty()) { + groupInfo.addHost(hostname); + } else { + // should not be more than one group info if host count is specified + groupInfo.setRequestedCount(hostInfoEntity.getHostCount()); + String hostPredicate = hostInfoEntity.getPredicate(); + if (hostPredicate != null) { + try { + groupInfo.setPredicate(hostPredicate); + } catch (InvalidQueryException e) { + // log error but proceed with now predicate set + LOG.error(String.format( + "Failed to compile predicate '%s' during request replay: %s", hostPredicate, e), e); + } + } + } + + String groupConfigProperties = hostGroupEntity.getGroupProperties(); + String groupConfigAttributes = hostGroupEntity.getGroupAttributes(); + groupInfo.setConfiguration(createConfiguration(groupConfigProperties, groupConfigAttributes)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java new file mode 100644 index 0000000..184d9d2 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +/** + * Wrapper around a TopologyRequest which adds an id that can be used + * to refer to the persisted entity. + */ +public class PersistedTopologyRequest { + private final long id; + private final TopologyRequest request; + + public PersistedTopologyRequest(long id, TopologyRequest request) { + this.id = id; + this.request = request; + } + + public long getId() { + return id; + } + + public TopologyRequest getRequest() { + return request; + } +}
