Repository: ambari Updated Branches: refs/heads/trunk b0d6a5781 -> 6fe7f8327
AMBARI-15637. If RU/EU is paused, services are restarted on the older version. EU is more complex since stopping services should use the original version. (alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6fe7f832 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6fe7f832 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6fe7f832 Branch: refs/heads/trunk Commit: 6fe7f83277a269dc6d9634b186ff3fc05fca8505 Parents: b0d6a57 Author: Alejandro Fernandez <[email protected]> Authored: Thu Mar 24 16:33:12 2016 -0700 Committer: Alejandro Fernandez <[email protected]> Committed: Thu Mar 31 17:23:08 2016 -0700 ---------------------------------------------------------------------- .../upgrades/upgrade_test_skip_failures.xml | 1 + .../AmbariCustomCommandExecutionHelper.java | 25 ++-- .../AmbariManagementControllerImpl.java | 9 +- .../ClusterStackVersionResourceProvider.java | 2 + .../internal/UpgradeResourceProvider.java | 34 ++++- .../ambari/server/orm/dao/UpgradeDAO.java | 18 ++- .../server/orm/entities/UpgradeEntity.java | 4 +- .../org/apache/ambari/server/state/Cluster.java | 9 +- .../server/state/cluster/ClusterImpl.java | 135 ++++++++++++++++++- .../internal/UpgradeResourceProviderTest.java | 14 +- .../ambari/server/orm/dao/UpgradeDAOTest.java | 35 +++-- .../upgrades/upgrade_test_skip_failures.xml | 1 + 12 files changed, 256 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml ---------------------------------------------------------------------- diff --git a/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml b/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml index 597270e..b2c4b93 100644 --- a/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml +++ b/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml @@ -19,6 +19,7 @@ <target>2.2.*</target> <skip-failures>true</skip-failures> <skip-service-check-failures>true</skip-service-check-failures> + <type>ROLLING</type> <prerequisite-checks> <check>org.apache.ambari.server.checks.HiveMultipleMetastoreCheck</check> <check>org.apache.ambari.server.checks.MapReduce2JobHistoryStatePreservingCheck</check> http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java index ee7fe7b..f3197cb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java @@ -246,6 +246,15 @@ public class AmbariCustomCommandExecutionHelper { return sb.toString(); } + /** + * Called during the start/stop/restart of services, plus custom commands during Stack Upgrade. + * @param actionExecutionContext Execution Context + * @param resourceFilter Resource Filter + * @param stage Command stage + * @param additionalCommandParams Additional command params to add the the stage + * @param commandDetail String for the command detail + * @throws AmbariException + */ private void addCustomCommandAction(final ActionExecutionContext actionExecutionContext, final RequestResourceFilter resourceFilter, Stage stage, Map<String, String> additionalCommandParams, String commandDetail) throws AmbariException { @@ -414,15 +423,12 @@ public class AmbariCustomCommandExecutionHelper { } commandParams.put(COMMAND_TIMEOUT, commandTimeout); - - commandParams.put(SERVICE_PACKAGE_FOLDER, - serviceInfo.getServicePackageFolder()); - + commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); - ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion(); - if (currentClusterVersion != null) { - commandParams.put(KeyNames.VERSION, currentClusterVersion.getRepositoryVersion().getVersion()); + ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion(); + if (effectiveClusterVersion != null) { + commandParams.put(KeyNames.VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion()); } execCmd.setCommandParams(commandParams); @@ -635,9 +641,7 @@ public class AmbariCustomCommandExecutionHelper { } commandParams.put(COMMAND_TIMEOUT, commandTimeout); - - commandParams.put(SERVICE_PACKAGE_FOLDER, - serviceInfo.getServicePackageFolder()); + commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); execCmd.setCommandParams(commandParams); @@ -959,7 +963,6 @@ public class AmbariCustomCommandExecutionHelper { } else if (isValidCustomCommand(actionExecutionContext, resourceFilter)) { String commandDetail = getReadableCustomCommandDetail(actionExecutionContext, resourceFilter); - Map<String, String> extraParams = new HashMap<String, String>(); String componentName = (null == resourceFilter.getComponentName()) ? null : resourceFilter.getComponentName().toLowerCase(); http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index e6dd2f7..d1f8232 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -2153,9 +2153,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } commandParams.put(MAX_DURATION_OF_RETRIES, Integer.toString(retryMaxTime)); commandParams.put(COMMAND_RETRY_ENABLED, Boolean.toString(retryEnabled)); - ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion(); - if (currentClusterVersion != null) { - commandParams.put(VERSION, currentClusterVersion.getRepositoryVersion().getVersion()); + + ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion(); + if (effectiveClusterVersion != null) { + commandParams.put(VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion()); } if (script.getTimeout() > 0) { scriptCommandTimeout = String.valueOf(script.getTimeout()); @@ -3582,7 +3583,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } else { actionExecutionHelper.validateAction(actionRequest); } - + // TODO Alejandro, Called First. insert params.version. Called during Rebalance HDFS, ZOOKEEPER Restart, Zookeeper Service Check. long requestId = actionManager.getNextRequestId(); RequestStageContainer requestStageContainer = new RequestStageContainer( requestId, http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java index 6f3c03c..bb50820 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java @@ -754,6 +754,8 @@ public class ClusterStackVersionResourceProvider extends AbstractControllerResou } } else { // !!! revisit for PU + // If forcing to become CURRENT, get the Cluster Version whose state is CURRENT and make sure that + // the Host Version records for the same Repo Version are also marked as CURRENT. ClusterVersionEntity current = cluster.getCurrentClusterVersion(); if (!current.getRepositoryVersion().equals(rve)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java index 714495f..b3bf345 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java @@ -78,6 +78,7 @@ import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; import org.apache.ambari.server.orm.entities.UpgradeItemEntity; import org.apache.ambari.server.security.authorization.AuthorizationException; +import org.apache.ambari.server.serveraction.upgrades.UpdateDesiredStackAction; import org.apache.ambari.server.stack.MasterHostResolver; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; @@ -142,6 +143,18 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider protected static final String UPGRADE_SKIP_PREREQUISITE_CHECKS = "Upgrade/skip_prerequisite_checks"; protected static final String UPGRADE_FAIL_ON_CHECK_WARNINGS = "Upgrade/fail_on_check_warnings"; + + /** + * Names that appear in the Upgrade Packs that are used by + * {@link org.apache.ambari.server.state.cluster.ClusterImpl#isNonRollingUpgradePastUpgradingStack} + * to determine if an upgrade has already changed the version to use. + * For this reason, DO NOT CHANGE the name of these since they represent historic values. + */ + public static final String CONST_UPGRADE_GROUP_NAME = "UPDATE_DESIRED_STACK_ID"; + public static final String CONST_UPGRADE_ITEM_TEXT = "Update Target Stack"; + public static final String CONST_CUSTOM_COMMAND_NAME = UpdateDesiredStackAction.class.getName(); + + /** * Skip slave/client component failures if the tasks are skippable. */ @@ -809,6 +822,23 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider throw new AmbariException("There are no groupings available"); } + // Non Rolling Upgrades require a group with name "UPDATE_DESIRED_STACK_ID". + // This is needed as a marker to indicate which version to use when an upgrade is paused. + if (pack.getType() == UpgradeType.NON_ROLLING) { + boolean foundGroupWithNameUPDATE_DESIRED_STACK_ID = false; + for (UpgradeGroupHolder group : groups) { + if (group.name.equalsIgnoreCase(this.CONST_UPGRADE_GROUP_NAME)) { + foundGroupWithNameUPDATE_DESIRED_STACK_ID = true; + break; + } + } + + if (foundGroupWithNameUPDATE_DESIRED_STACK_ID == false) { + throw new AmbariException(String.format("NonRolling Upgrade Pack %s requires a Group with name %s", + pack.getName(), this.CONST_UPGRADE_GROUP_NAME)); + } + } + List<UpgradeGroupEntity> groupEntities = new ArrayList<>(); RequestStageContainer req = createRequest(direction, version); @@ -1667,7 +1697,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider // Remove relevant upgrade entity try { Cluster cluster = clusters.get().getClusterById(clusterId); - UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeForCluster(cluster.getClusterId()); + UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId()); lastUpgradeItemForCluster.setSuspended(true); s_upgradeDAO.merge(lastUpgradeItemForCluster); @@ -1690,7 +1720,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider try { Cluster cluster = clusters.get().getClusterById(clusterId); - UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeForCluster(cluster.getClusterId()); + UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId()); lastUpgradeItemForCluster.setSuspended(false); s_upgradeDAO.merge(lastUpgradeItemForCluster); http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java index 4a923be..2d0a4d7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java @@ -181,7 +181,7 @@ public class UpgradeDAO { @RequiresSession public UpgradeEntity findLastUpgradeForCluster(long clusterId) { TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery( - "UpgradeEntity.findLatestForCluster", UpgradeEntity.class); + "UpgradeEntity.findLatestForClusterInDirection", UpgradeEntity.class); query.setMaxResults(1); query.setParameter("clusterId", clusterId); query.setParameter("direction", Direction.UPGRADE); @@ -191,6 +191,22 @@ public class UpgradeDAO { return daoUtils.selectSingle(query); } + /** + * @param clusterId the cluster id + * @return the upgrade entity, or {@code null} if not found + */ + @RequiresSession + public UpgradeEntity findLastUpgradeOrDowngradeForCluster(long clusterId) { + TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery( + "UpgradeEntity.findLatestForCluster", UpgradeEntity.class); + query.setMaxResults(1); + query.setParameter("clusterId", clusterId); + + query.setHint(QueryHints.REFRESH, HintValues.TRUE); + + return daoUtils.selectSingle(query); + } + @Transactional public UpgradeEntity merge(UpgradeEntity upgradeEntity) { return entityManagerProvider.get().merge(upgradeEntity); http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java index fd866a1..db27ea5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java @@ -52,8 +52,10 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType; query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId"), @NamedQuery(name = "UpgradeEntity.findUpgrade", query = "SELECT u FROM UpgradeEntity u WHERE u.upgradeId = :upgradeId"), + @NamedQuery(name = "UpgradeEntity.findLatestForClusterInDirection", + query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC"), @NamedQuery(name = "UpgradeEntity.findLatestForCluster", - query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY u.upgradeId DESC"), + query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"), }) public class UpgradeEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index 128c392..38d05ab 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -132,11 +132,18 @@ public interface Cluster { /** * Get the ClusterVersionEntity object whose state is CURRENT. - * @return + * @return Cluster Version entity to whose state is CURRENT. */ ClusterVersionEntity getCurrentClusterVersion(); /** + * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT. + * If RU/EU is in progress, based on the direction and desired stack, determine which version to use. + * @return Cluster Version entity to use. + */ + ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException; + + /** * Get all of the ClusterVersionEntity objects for the cluster. * @return */ http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 878f83b..9e456eb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -48,6 +48,8 @@ import org.apache.ambari.server.ParentObjectNotFoundException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.ServiceNotFoundException; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariSessionManager; @@ -56,6 +58,7 @@ import org.apache.ambari.server.controller.ConfigurationResponse; import org.apache.ambari.server.controller.MaintenanceStateHelper; import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; +import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; import org.apache.ambari.server.events.AmbariEvent.AmbariEventType; import org.apache.ambari.server.events.ClusterEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; @@ -68,8 +71,10 @@ import org.apache.ambari.server.orm.dao.ClusterStateDAO; import org.apache.ambari.server.orm.dao.ClusterVersionDAO; import org.apache.ambari.server.orm.dao.HostConfigMappingDAO; import org.apache.ambari.server.orm.dao.HostDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.TopologyRequestDAO; @@ -83,16 +88,20 @@ import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.ConfigGroupEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.PermissionEntity; import org.apache.ambari.server.orm.entities.PrivilegeEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.RequestScheduleEntity; import org.apache.ambari.server.orm.entities.ResourceEntity; import org.apache.ambari.server.orm.entities.ServiceConfigEntity; import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.orm.entities.TopologyRequestEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; +import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; +import org.apache.ambari.server.orm.entities.UpgradeItemEntity; import org.apache.ambari.server.security.authorization.AuthorizationException; import org.apache.ambari.server.security.authorization.AuthorizationHelper; import org.apache.ambari.server.state.Cluster; @@ -125,6 +134,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; +import org.apache.ambari.server.state.stack.upgrade.Direction; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary; import org.apache.ambari.server.topology.TopologyRequest; import org.apache.commons.lang.StringUtils; @@ -212,6 +222,12 @@ public class ClusterImpl implements Cluster { private ClusterVersionDAO clusterVersionDAO; @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private RequestDAO requestDAO; + + @Inject private HostDAO hostDAO; @Inject @@ -302,6 +318,9 @@ public class ClusterImpl implements Cluster { StringUtils.isEmpty(desiredStackVersion.getStackVersion())) { loadServiceConfigTypes(); } + + // Load any active stack upgrades. + loadStackUpgrade(); } @@ -321,6 +340,24 @@ public class ClusterImpl implements Cluster { } /** + * When a cluster is first loaded, determine if it has a stack upgrade in progress. + */ + private void loadStackUpgrade() { + clusterGlobalLock.writeLock().lock(); + + try { + UpgradeEntity activeUpgrade = this.getUpgradeInProgress(); + if (activeUpgrade != null) { + this.setUpgradeEntity(activeUpgrade); + } + } catch (AmbariException e) { + LOG.error("Unable to load active stack upgrade. Error: " + e.getMessage()); + } finally { + clusterGlobalLock.writeLock().unlock(); + } + } + + /** * Construct config type to service name mapping * @throws AmbariException when stack or its part not found */ @@ -1140,12 +1177,106 @@ public class ClusterImpl implements Cluster { Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities(); for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) { if (clusterVersionEntity.getState() == RepositoryVersionState.CURRENT) { -// TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation + // TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation return clusterVersionEntity; } } return null; -// return clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()); + } + + /** + * Get any stack upgrade currently in progress. + * @return + */ + private UpgradeEntity getUpgradeInProgress() { + UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(this.getClusterId()); + if (mostRecentUpgrade != null) { + List<HostRoleStatus> UNFINISHED_STATUSES = new ArrayList(); + UNFINISHED_STATUSES.add(HostRoleStatus.PENDING); + UNFINISHED_STATUSES.add(HostRoleStatus.ABORTED); + + List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(mostRecentUpgrade.getRequestId(), UNFINISHED_STATUSES); + if (!commands.isEmpty()) { + return mostRecentUpgrade; + } + } + + return null; + } + + /** + * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT. + * If RU/EU is in progress, based on the direction and desired stack, determine which version to use. + * Assuming upgrading from HDP 2.2.0.0-1 to 2.3.0.0-2, then + * RU Upgrade: 2.3.0.0-2 (desired stack id) + * RU Downgrade: 2.2.0.0-1 (desired stack id) + * EU Upgrade: while stopping services and before changing desired stack, use 2.2.0.0-1, after, use 2.3.0.0-2 + * EU Downgrade: while stopping services and before changing desired stack, use 2.3.0.0-2, after, use 2.2.0.0-1 + * @return + */ + @Override + public ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException { + // This is not reliable. Need to find the last upgrade request. + UpgradeEntity upgradeInProgress = this.getUpgradeEntity(); + if (upgradeInProgress == null) { + return this.getCurrentClusterVersion(); + } + + String effectiveVersion = null; + switch (upgradeInProgress.getUpgradeType()) { + case NON_ROLLING: + if (upgradeInProgress.getDirection() == Direction.UPGRADE) { + boolean pastChangingStack = this.isNonRollingUpgradePastUpgradingStack(upgradeInProgress); + effectiveVersion = pastChangingStack ? upgradeInProgress.getToVersion() : upgradeInProgress.getFromVersion(); + } else { + // Should be the lower value during a Downgrade. + effectiveVersion = upgradeInProgress.getToVersion(); + } + break; + case ROLLING: + default: + // Version will be higher on upgrade and lower on downgrade directions. + effectiveVersion = upgradeInProgress.getToVersion(); + break; + } + + if (effectiveVersion == null) { + throw new AmbariException("Unable to determine which version to use during Stack Upgrade, effectiveVersion is null."); + } + + // Find the first cluster version whose repo matches the expected version. + Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities(); + for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) { + if (clusterVersionEntity.getRepositoryVersion().getVersion().equals(effectiveVersion)) { + return clusterVersionEntity; + } + } + return null; + } + + /** + * Given a NonRolling stack upgrade, determine if it has already crossed the point of using the newer version. + * @param upgrade Stack Upgrade + * @return Return true if should be using to_version, otherwise, false to mean the from_version. + */ + private boolean isNonRollingUpgradePastUpgradingStack(UpgradeEntity upgrade) { + for (UpgradeGroupEntity group : upgrade.getUpgradeGroups()) { + if (group.getName().equalsIgnoreCase(UpgradeResourceProvider.CONST_UPGRADE_GROUP_NAME)) { + for (UpgradeItemEntity item : group.getItems()) { + List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(upgrade.getRequestId(), item.getStageId()); + List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByPKs(taskIds); + for (HostRoleCommandEntity command : commands) { + if (command.getCustomCommandName() != null && + command.getCustomCommandName().equalsIgnoreCase(UpgradeResourceProvider.CONST_CUSTOM_COMMAND_NAME) && + command.getStatus() == HostRoleStatus.COMPLETED) { + return true; + } + } + } + return false; + } + } + return false; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java index 64a8852..4a474bf 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -57,11 +58,13 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.dao.StageDAO; import org.apache.ambari.server.orm.dao.UpgradeDAO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; @@ -113,6 +116,7 @@ import com.google.inject.util.Modules; public class UpgradeResourceProviderTest { private UpgradeDAO upgradeDao = null; + private RequestDAO requestDao = null; private RepositoryVersionDAO repoVersionDao = null; private Injector injector; private Clusters clusters; @@ -159,6 +163,7 @@ public class UpgradeResourceProviderTest { stackDAO = injector.getInstance(StackDAO.class); upgradeDao = injector.getInstance(UpgradeDAO.class); + requestDao = injector.getInstance(RequestDAO.class); repoVersionDao = injector.getInstance(RepositoryVersionDAO.class); AmbariEventPublisher publisher = createNiceMock(AmbariEventPublisher.class); @@ -575,6 +580,13 @@ public class UpgradeResourceProviderTest { // a downgrade MUST have an upgrade to come from, so populate an upgrade in // the DB + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(2L); + requestEntity.setClusterId(cluster.getClusterId()); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDao.create(requestEntity); + UpgradeEntity upgradeEntity = new UpgradeEntity(); upgradeEntity.setClusterId(cluster.getClusterId()); upgradeEntity.setDirection(Direction.UPGRADE); @@ -582,7 +594,7 @@ public class UpgradeResourceProviderTest { upgradeEntity.setToVersion("2.2.2.2"); upgradeEntity.setUpgradePackage("upgrade_test"); upgradeEntity.setUpgradeType(UpgradeType.ROLLING); - upgradeEntity.setRequestId(1L); + upgradeEntity.setRequestId(2L); upgradeDao.create(upgradeEntity); upgrades = upgradeDao.findUpgrades(cluster.getClusterId()); http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java index cf79b6f..cc49cbd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java @@ -26,10 +26,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.orm.entities.UpgradeGroupEntity; import org.apache.ambari.server.orm.entities.UpgradeItemEntity; @@ -55,6 +58,7 @@ public class UpgradeDAOTest { private Injector injector; private Long clusterId; private UpgradeDAO dao; + private RequestDAO requestDAO; private OrmTestHelper helper; @@ -67,6 +71,7 @@ public class UpgradeDAOTest { injector.getInstance(GuiceJpaInitializer.class); dao = injector.getInstance(UpgradeDAO.class); + requestDAO = injector.getInstance(RequestDAO.class); helper = injector.getInstance(OrmTestHelper.class); clusterId = helper.createCluster(); @@ -140,11 +145,18 @@ public class UpgradeDAOTest { @Test public void testFindLastUpgradeForCluster() throws Exception { // create upgrade entities + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(1L); + requestEntity.setClusterId(1L); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDAO.create(requestEntity); + UpgradeEntity entity1 = new UpgradeEntity(); entity1.setId(11L); - entity1.setClusterId(Long.valueOf(1)); + entity1.setClusterId(1L); entity1.setDirection(Direction.UPGRADE); - entity1.setRequestId(Long.valueOf(1)); + entity1.setRequestId(1L); entity1.setFromVersion("2.2.0.0-1234"); entity1.setToVersion("2.3.0.0-4567"); entity1.setUpgradeType(UpgradeType.ROLLING); @@ -153,9 +165,9 @@ public class UpgradeDAOTest { dao.create(entity1); UpgradeEntity entity2 = new UpgradeEntity(); entity2.setId(22L); - entity2.setClusterId(Long.valueOf(1)); + entity2.setClusterId(1L); entity2.setDirection(Direction.DOWNGRADE); - entity2.setRequestId(Long.valueOf(1)); + entity2.setRequestId(1L); entity2.setFromVersion("2.3.0.0-4567"); entity2.setToVersion("2.2.0.0-1234"); entity2.setUpgradeType(UpgradeType.ROLLING); @@ -164,9 +176,9 @@ public class UpgradeDAOTest { dao.create(entity2); UpgradeEntity entity3 = new UpgradeEntity(); entity3.setId(33L); - entity3.setClusterId(Long.valueOf(1)); + entity3.setClusterId(1L); entity3.setDirection(Direction.UPGRADE); - entity3.setRequestId(Long.valueOf(1)); + entity3.setRequestId(1L); entity3.setFromVersion("2.2.0.0-1234"); entity3.setToVersion("2.3.1.1-4567"); entity3.setUpgradeType(UpgradeType.ROLLING); @@ -185,11 +197,18 @@ public class UpgradeDAOTest { */ @Test public void testUpdatableColumns() throws Exception { + RequestEntity requestEntity = new RequestEntity(); + requestEntity.setRequestId(1L); + requestEntity.setClusterId(1L); + requestEntity.setStatus(HostRoleStatus.PENDING); + requestEntity.setStages(new ArrayList<StageEntity>()); + requestDAO.create(requestEntity); + UpgradeEntity upgradeEntity = new UpgradeEntity(); upgradeEntity.setId(11L); - upgradeEntity.setClusterId(Long.valueOf(1)); + upgradeEntity.setClusterId(1L); upgradeEntity.setDirection(Direction.UPGRADE); - upgradeEntity.setRequestId(Long.valueOf(1)); + upgradeEntity.setRequestId(1L); upgradeEntity.setFromVersion("2.2.0.0-1234"); upgradeEntity.setToVersion("2.3.0.0-4567"); upgradeEntity.setUpgradeType(UpgradeType.ROLLING); http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml index 597270e..b2c4b93 100644 --- a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml +++ b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml @@ -19,6 +19,7 @@ <target>2.2.*</target> <skip-failures>true</skip-failures> <skip-service-check-failures>true</skip-service-check-failures> + <type>ROLLING</type> <prerequisite-checks> <check>org.apache.ambari.server.checks.HiveMultipleMetastoreCheck</check> <check>org.apache.ambari.server.checks.MapReduce2JobHistoryStatePreservingCheck</check>
