Repository: ambari Updated Branches: refs/heads/branch-2.2 3b11cf2ff -> aa59546bb
AMBARI-15637. If RU/EU is paused, services are restarted on the older version. EU is more complex since stopping services should use the original version. (alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/aa59546b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/aa59546b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/aa59546b Branch: refs/heads/branch-2.2 Commit: aa59546bbfdf2c4c92aaa724be4e64ab4b80ea72 Parents: 3b11cf2 Author: Alejandro Fernandez <[email protected]> Authored: Tue Mar 29 14:42:39 2016 -0700 Committer: Alejandro Fernandez <[email protected]> Committed: Thu Mar 31 17:24:48 2016 -0700 ---------------------------------------------------------------------- .../AmbariCustomCommandExecutionHelper.java | 26 +++-- .../AmbariManagementControllerImpl.java | 7 +- .../ClusterStackVersionResourceProvider.java | 2 + .../internal/UpgradeResourceProvider.java | 27 +++++ .../server/orm/dao/HostRoleCommandDAO.java | 10 ++ .../ambari/server/orm/dao/UpgradeDAO.java | 18 ++- .../orm/entities/HostRoleCommandEntity.java | 1 + .../server/orm/entities/UpgradeEntity.java | 4 +- .../org/apache/ambari/server/state/Cluster.java | 9 +- .../server/state/cluster/ClusterImpl.java | 113 ++++++++++++++++++- .../internal/UpgradeResourceProviderTest.java | 14 ++- .../ambari/server/orm/dao/UpgradeDAOTest.java | 35 ++++-- .../upgrades/upgrade_test_skip_failures.xml | 1 + 13 files changed, 238 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java index 1767b02..13cf438 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java @@ -20,7 +20,6 @@ package org.apache.ambari.server.controller; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_COUNT; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_ON_UNAVAILABILITY; -import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_COUNT; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.CLIENTS_TO_UPDATE_CONFIGS; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_TIMEOUT; import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMPONENT_CATEGORY; @@ -54,7 +53,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Iterator; import java.util.Set; import java.util.TreeMap; @@ -248,6 +246,15 @@ public class AmbariCustomCommandExecutionHelper { return sb.toString(); } + /** + * Called during the start/stop/restart of services, plus custom commands during Stack Upgrade. + * @param actionExecutionContext Execution Context + * @param resourceFilter Resource Filter + * @param stage Command stage + * @param additionalCommandParams Additional command params to add the the stage + * @param commandDetail String for the command detail + * @throws AmbariException + */ private void addCustomCommandAction(final ActionExecutionContext actionExecutionContext, final RequestResourceFilter resourceFilter, Stage stage, Map<String, String> additionalCommandParams, String commandDetail) throws AmbariException { @@ -416,15 +423,12 @@ public class AmbariCustomCommandExecutionHelper { } commandParams.put(COMMAND_TIMEOUT, commandTimeout); - - commandParams.put(SERVICE_PACKAGE_FOLDER, - serviceInfo.getServicePackageFolder()); - + commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); - ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion(); - if (currentClusterVersion != null) { - commandParams.put(KeyNames.VERSION, currentClusterVersion.getRepositoryVersion().getVersion()); + ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion(); + if (effectiveClusterVersion != null) { + commandParams.put(KeyNames.VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion()); } execCmd.setCommandParams(commandParams); @@ -638,9 +642,7 @@ public class AmbariCustomCommandExecutionHelper { } commandParams.put(COMMAND_TIMEOUT, commandTimeout); - - commandParams.put(SERVICE_PACKAGE_FOLDER, - serviceInfo.getServicePackageFolder()); + commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); execCmd.setCommandParams(commandParams); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index dd66dcc..40c9a12 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -2008,9 +2008,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } commandParams.put(MAX_DURATION_OF_RETRIES, Integer.toString(retryMaxTime)); commandParams.put(COMMAND_RETRY_ENABLED, Boolean.toString(retryEnabled)); - ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion(); - if (currentClusterVersion != null) { - commandParams.put(VERSION, currentClusterVersion.getRepositoryVersion().getVersion()); + + ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion(); + if (effectiveClusterVersion != null) { + commandParams.put(VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion()); } if (script.getTimeout() > 0) { scriptCommandTimeout = String.valueOf(script.getTimeout()); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java index ec49364..52357de 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java @@ -650,6 +650,8 @@ public class ClusterStackVersionResourceProvider extends AbstractControllerResou } } else { // !!! revisit for PU + // If forcing to become CURRENT, get the Cluster Version whose state is CURRENT and make sure that + // the Host Version records for the same Repo Version are also marked as CURRENT. ClusterVersionEntity current = cluster.getCurrentClusterVersion(); if (!current.getRepositoryVersion().equals(rve)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java index ef8a8d4..fccd19d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java @@ -78,6 +78,7 @@ import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; import org.apache.ambari.server.orm.entities.UpgradeItemEntity; +import org.apache.ambari.server.serveraction.upgrades.UpdateDesiredStackAction; import org.apache.ambari.server.stack.JmxQuery; import org.apache.ambari.server.stack.MasterHostResolver; import org.apache.ambari.server.state.Cluster; @@ -136,6 +137,15 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider protected static final String UPGRADE_FAIL_ON_CHECK_WARNINGS = "Upgrade/fail_on_check_warnings"; /** + * Names that appear in the Upgrade Packs that are used by + * {@link org.apache.ambari.server.state.cluster.ClusterImpl#isNonRollingUpgradePastUpgradingStack} + * to determine if an upgrade has already changed the version to use. + * For this reason, DO NOT CHANGE the name of these since they represent historic values. + */ + public static final String CONST_UPGRADE_GROUP_NAME = "UPDATE_DESIRED_STACK_ID"; + public static final String CONST_CUSTOM_COMMAND_NAME = UpdateDesiredStackAction.class.getName(); + + /** * Skip slave/client component failures if the tasks are skippable. */ protected static final String UPGRADE_SKIP_FAILURES = "Upgrade/skip_failures"; @@ -779,6 +789,23 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider throw new AmbariException("There are no groupings available"); } + // Non Rolling Upgrades require a group with name "UPDATE_DESIRED_STACK_ID". + // This is needed as a marker to indicate which version to use when an upgrade is paused. + if (pack.getType() == UpgradeType.NON_ROLLING) { + boolean foundGroupWithNameUPDATE_DESIRED_STACK_ID = false; + for (UpgradeGroupHolder group : groups) { + if (group.name.equalsIgnoreCase(this.CONST_UPGRADE_GROUP_NAME)) { + foundGroupWithNameUPDATE_DESIRED_STACK_ID = true; + break; + } + } + + if (foundGroupWithNameUPDATE_DESIRED_STACK_ID == false) { + throw new AmbariException(String.format("NonRolling Upgrade Pack %s requires a Group with name %s", + pack.getName(), this.CONST_UPGRADE_GROUP_NAME)); + } + } + List<UpgradeGroupEntity> groupEntities = new ArrayList<UpgradeGroupEntity>(); RequestStageContainer req = createRequest(direction, version); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 5c8b7f3..d2c5c58 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -312,6 +312,16 @@ public class HostRoleCommandDAO { } @RequiresSession + public List<HostRoleCommandEntity> findByRequestIdAndStatuses(Long requestId, Collection<HostRoleStatus> statuses) { + TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery( + "HostRoleCommandEntity.findByRequestIdAndStatuses", HostRoleCommandEntity.class); + query.setParameter("requestId", requestId); + query.setParameter("statuses", statuses); + List results = query.getResultList(); + return results; + } + + @RequiresSession public List<Long> findTaskIdsByRequestIds(Collection<Long> requestIds) { TypedQuery<Long> query = entityManagerProvider.get().createQuery( "SELECT task.taskId FROM HostRoleCommandEntity task " + http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java index 06f6ac1..e6e2b23 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java @@ -175,7 +175,7 @@ public class UpgradeDAO { @RequiresSession public UpgradeEntity findLastUpgradeForCluster(long clusterId) { TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery( - "UpgradeEntity.findLatestForCluster", UpgradeEntity.class); + "UpgradeEntity.findLatestForClusterInDirection", UpgradeEntity.class); query.setMaxResults(1); query.setParameter("clusterId", clusterId); query.setParameter("direction", Direction.UPGRADE); @@ -185,6 +185,22 @@ public class UpgradeDAO { return daoUtils.selectSingle(query); } + /** + * @param clusterId the cluster id + * @return the upgrade entity, or {@code null} if not found + */ + @RequiresSession + public UpgradeEntity findLastUpgradeOrDowngradeForCluster(long clusterId) { + TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery( + "UpgradeEntity.findLatestForCluster", UpgradeEntity.class); + query.setMaxResults(1); + query.setParameter("clusterId", clusterId); + + query.setHint(QueryHints.REFRESH, HintValues.TRUE); + + return daoUtils.selectSingle(query); + } + @Transactional public UpgradeEntity merge(UpgradeEntity upgradeEntity) { return entityManagerProvider.get().merge(upgradeEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java index a12b204..55a7160 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java @@ -61,6 +61,7 @@ import org.apache.commons.lang.ArrayUtils; ) @NamedQueries({ @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"), + @NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", query="SELECT task FROM HostRoleCommandEntity task WHERE task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId ASC"), @NamedQuery(name = "HostRoleCommandEntity.findByCommandStatuses", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.status IN :statuses ORDER BY command.requestId, command.stageId"), @NamedQuery(name = "HostRoleCommandEntity.findByHostId", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.hostId=:hostId"), @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"), http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java index fd866a1..db27ea5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java @@ -52,8 +52,10 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType; query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId"), @NamedQuery(name = "UpgradeEntity.findUpgrade", query = "SELECT u FROM UpgradeEntity u WHERE u.upgradeId = :upgradeId"), + @NamedQuery(name = "UpgradeEntity.findLatestForClusterInDirection", + query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC"), @NamedQuery(name = "UpgradeEntity.findLatestForCluster", - query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY u.upgradeId DESC"), + query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"), }) public class UpgradeEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index b49f566..88a60c8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -125,11 +125,18 @@ public interface Cluster { /** * Get the ClusterVersionEntity object whose state is CURRENT. - * @return + * @return Cluster Version entity to whose state is CURRENT. */ ClusterVersionEntity getCurrentClusterVersion(); /** + * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT. + * If RU/EU is in progress, based on the direction and desired stack, determine which version to use. + * @return Cluster Version entity to use. + */ + ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException; + + /** * Get all of the ClusterVersionEntity objects for the cluster. * @return */ http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 3493508..dbff426 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -46,6 +46,7 @@ import org.apache.ambari.server.ParentObjectNotFoundException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.ServiceNotFoundException; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariSessionManager; @@ -54,6 +55,7 @@ import org.apache.ambari.server.controller.ConfigurationResponse; import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; +import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.cache.HostConfigMapping; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; @@ -64,8 +66,10 @@ import org.apache.ambari.server.orm.dao.ClusterVersionDAO; import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO; import org.apache.ambari.server.orm.dao.HostConfigMappingDAO; import org.apache.ambari.server.orm.dao.HostDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.TopologyRequestDAO; @@ -79,16 +83,20 @@ import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.ConfigGroupEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.PermissionEntity; import org.apache.ambari.server.orm.entities.PrivilegeEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.RequestScheduleEntity; import org.apache.ambari.server.orm.entities.ResourceEntity; import org.apache.ambari.server.orm.entities.ServiceConfigEntity; import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.orm.entities.TopologyRequestEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; +import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; +import org.apache.ambari.server.orm.entities.UpgradeItemEntity; import org.apache.ambari.server.security.authorization.AuthorizationHelper; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.ClusterHealthReport; @@ -120,6 +128,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; +import org.apache.ambari.server.state.stack.upgrade.Direction; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary; import org.apache.ambari.server.topology.TopologyRequest; import org.apache.commons.lang.StringUtils; @@ -204,6 +213,12 @@ public class ClusterImpl implements Cluster { private ClusterVersionDAO clusterVersionDAO; @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private RequestDAO requestDAO; + + @Inject private HostDAO hostDAO; @Inject @@ -1103,12 +1118,106 @@ public class ClusterImpl implements Cluster { Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities(); for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) { if (clusterVersionEntity.getState() == RepositoryVersionState.CURRENT) { -// TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation + // TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation + return clusterVersionEntity; + } + } + return null; + } + + /** + * Get any stack upgrade currently in progress. + * @return + */ + private UpgradeEntity getUpgradeInProgress() { + UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(this.getClusterId()); + + if (mostRecentUpgrade != null) { + List<HostRoleStatus> UNFINISHED_STATUSES = new ArrayList(); + UNFINISHED_STATUSES.add(HostRoleStatus.PENDING); + UNFINISHED_STATUSES.add(HostRoleStatus.ABORTED); + + List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(mostRecentUpgrade.getRequestId(), UNFINISHED_STATUSES); + if (!commands.isEmpty()) { + return mostRecentUpgrade; + } + } + return null; + } + + /** + * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT. + * If RU/EU is in progress, based on the direction and desired stack, determine which version to use. + * Assuming upgrading from HDP 2.2.0.0-1 to 2.3.0.0-2, then + * RU Upgrade: 2.3.0.0-2 (desired stack id) + * RU Downgrade: 2.2.0.0-1 (desired stack id) + * EU Upgrade: while stopping services and before changing desired stack, use 2.2.0.0-1, after, use 2.3.0.0-2 + * EU Downgrade: while stopping services and before changing desired stack, use 2.3.0.0-2, after, use 2.2.0.0-1 + * @return + */ + @Override + public ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException { + // This is not reliable. Need to find the last upgrade request. + UpgradeEntity upgradeInProgress = this.getUpgradeInProgress(); + if (upgradeInProgress == null) { + return this.getCurrentClusterVersion(); + } + + String effectiveVersion = null; + switch (upgradeInProgress.getUpgradeType()) { + case NON_ROLLING: + if (upgradeInProgress.getDirection() == Direction.UPGRADE) { + boolean pastChangingStack = this.isNonRollingUpgradePastUpgradingStack(upgradeInProgress); + effectiveVersion = pastChangingStack ? upgradeInProgress.getToVersion() : upgradeInProgress.getFromVersion(); + } else { + // Should be the lower value during a Downgrade. + effectiveVersion = upgradeInProgress.getToVersion(); + } + break; + case ROLLING: + default: + // Version will be higher on upgrade and lower on downgrade directions. + effectiveVersion = upgradeInProgress.getToVersion(); + break; + } + + if (effectiveVersion == null) { + throw new AmbariException("Unable to determine which version to use during Stack Upgrade, effectiveVersion is null."); + } + + // Find the first cluster version whose repo matches the expected version. + Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities(); + for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) { + if (clusterVersionEntity.getRepositoryVersion().getVersion().equals(effectiveVersion)) { return clusterVersionEntity; } } return null; -// return clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()); + } + + /** + * Given a NonRolling stack upgrade, determine if it has already crossed the point of using the newer version. + * @param upgrade Stack Upgrade + * @return Return true if should be using to_version, otherwise, false to mean the from_version. + */ + private boolean isNonRollingUpgradePastUpgradingStack(UpgradeEntity upgrade) { + for (UpgradeGroupEntity group : upgrade.getUpgradeGroups()) { + if (group.getName().equalsIgnoreCase(UpgradeResourceProvider.CONST_UPGRADE_GROUP_NAME)) { + for (UpgradeItemEntity item : group.getItems()) { + List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(upgrade.getRequestId(), item.getStageId()); + List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByPKs(taskIds); + for (HostRoleCommandEntity command : commands) { + if (command.getCustomCommandName() != null && + command.getCustomCommandName().equalsIgnoreCase(UpgradeResourceProvider.CONST_CUSTOM_COMMAND_NAME) && + command.getStatus() == HostRoleStatus.COMPLETED) { + return true; + } + } + } + return false; + } + } + return false; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java index 926807f..d456201 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -57,11 +58,13 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.StageDAO; import org.apache.ambari.server.orm.dao.UpgradeDAO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; @@ -110,6 +113,7 @@ import com.google.inject.util.Modules; public class UpgradeResourceProviderTest { private UpgradeDAO upgradeDao = null; + private RequestDAO requestDao = null; private RepositoryVersionDAO repoVersionDao = null; private Injector injector; private Clusters clusters; @@ -156,6 +160,7 @@ public class UpgradeResourceProviderTest { stackDAO = injector.getInstance(StackDAO.class); upgradeDao = injector.getInstance(UpgradeDAO.class); + requestDao = injector.getInstance(RequestDAO.class); repoVersionDao = injector.getInstance(RepositoryVersionDAO.class); AmbariEventPublisher publisher = createNiceMock(AmbariEventPublisher.class); @@ -555,6 +560,13 @@ public class UpgradeResourceProviderTest { // a downgrade MUST have an upgrade to come from, so populate an upgrade in // the DB + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(2L); + requestEntity.setClusterId(cluster.getClusterId()); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDao.create(requestEntity); + UpgradeEntity upgradeEntity = new UpgradeEntity(); upgradeEntity.setClusterId(cluster.getClusterId()); upgradeEntity.setDirection(Direction.UPGRADE); @@ -562,7 +574,7 @@ public class UpgradeResourceProviderTest { upgradeEntity.setToVersion("2.2.2.2"); upgradeEntity.setUpgradePackage("upgrade_test"); upgradeEntity.setUpgradeType(UpgradeType.ROLLING); - upgradeEntity.setRequestId(1L); + upgradeEntity.setRequestId(2L); upgradeDao.create(upgradeEntity); upgrades = upgradeDao.findUpgrades(cluster.getClusterId()); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java index 3ad2240..ddaee21 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java @@ -26,10 +26,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; import org.apache.ambari.server.orm.entities.UpgradeItemEntity; @@ -55,6 +58,7 @@ public class UpgradeDAOTest { private Injector injector; private Long clusterId; private UpgradeDAO dao; + private RequestDAO requestDAO; private OrmTestHelper helper; @@ -67,6 +71,7 @@ public class UpgradeDAOTest { injector.getInstance(GuiceJpaInitializer.class); dao = injector.getInstance(UpgradeDAO.class); + requestDAO = injector.getInstance(RequestDAO.class); helper = injector.getInstance(OrmTestHelper.class); clusterId = helper.createCluster(); @@ -140,11 +145,18 @@ public class UpgradeDAOTest { @Test public void testFindLastUpgradeForCluster() throws Exception { // create upgrade entities + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(1L); + requestEntity.setClusterId(1L); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDAO.create(requestEntity); + UpgradeEntity entity1 = new UpgradeEntity(); entity1.setId(11L); - entity1.setClusterId(Long.valueOf(1)); + entity1.setClusterId(1L); entity1.setDirection(Direction.UPGRADE); - entity1.setRequestId(Long.valueOf(1)); + entity1.setRequestId(1L); entity1.setFromVersion("2.2.0.0-1234"); entity1.setToVersion("2.3.0.0-4567"); entity1.setUpgradeType(UpgradeType.ROLLING); @@ -153,9 +165,9 @@ public class UpgradeDAOTest { dao.create(entity1); UpgradeEntity entity2 = new UpgradeEntity(); entity2.setId(22L); - entity2.setClusterId(Long.valueOf(1)); + entity2.setClusterId(1L); entity2.setDirection(Direction.DOWNGRADE); - entity2.setRequestId(Long.valueOf(1)); + entity2.setRequestId(1L); entity2.setFromVersion("2.3.0.0-4567"); entity2.setToVersion("2.2.0.0-1234"); entity2.setUpgradeType(UpgradeType.ROLLING); @@ -164,9 +176,9 @@ public class UpgradeDAOTest { dao.create(entity2); UpgradeEntity entity3 = new UpgradeEntity(); entity3.setId(33L); - entity3.setClusterId(Long.valueOf(1)); + entity3.setClusterId(1L); entity3.setDirection(Direction.UPGRADE); - entity3.setRequestId(Long.valueOf(1)); + entity3.setRequestId(1L); entity3.setFromVersion("2.2.0.0-1234"); entity3.setToVersion("2.3.1.1-4567"); entity3.setUpgradeType(UpgradeType.ROLLING); @@ -185,11 +197,18 @@ public class UpgradeDAOTest { */ @Test public void testUpdatableColumns() throws Exception { + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(1L); + requestEntity.setClusterId(1L); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDAO.create(requestEntity); + UpgradeEntity upgradeEntity = new UpgradeEntity(); upgradeEntity.setId(11L); - upgradeEntity.setClusterId(Long.valueOf(1)); + upgradeEntity.setClusterId(1L); upgradeEntity.setDirection(Direction.UPGRADE); - upgradeEntity.setRequestId(Long.valueOf(1)); + upgradeEntity.setRequestId(1L); upgradeEntity.setFromVersion("2.2.0.0-1234"); upgradeEntity.setToVersion("2.3.0.0-4567"); upgradeEntity.setUpgradeType(UpgradeType.ROLLING); http://git-wip-us.apache.org/repos/asf/ambari/blob/aa59546b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml index 92b4fe3..8956ba3 100644 --- a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml +++ b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml @@ -19,6 +19,7 @@ <target>2.2.*</target> <skip-failures>true</skip-failures> <skip-service-check-failures>true</skip-service-check-failures> + <type>ROLLING</type> <prerequisite-checks> <check>org.apache.ambari.server.checks.HiveMultipleMetastoreCheck</check> <check>org.apache.ambari.server.checks.MapReduce2JobHistoryStatePreservingCheck</check>
