Repository: ambari Updated Branches: refs/heads/branch-2.1 22307ad74 -> 82aed7ea4 refs/heads/trunk 1271328c0 -> 0212c06e4
AMBARI-12042. Add to ambari-server restart ability to handle hung INSTALLING repo version state (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0212c06e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0212c06e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0212c06e Branch: refs/heads/trunk Commit: 0212c06e4e973709727144e70c4ddec9856deb35 Parents: 1271328 Author: Lisnichenko Dmitro <[email protected]> Authored: Mon Jun 22 15:07:01 2015 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Mon Jun 22 15:07:01 2015 +0300 ---------------------------------------------------------------------- .../ambari/server/StateRecoveryManager.java | 85 ++++++++ .../ambari/server/controller/AmbariServer.java | 6 +- .../ambari/server/StateRecoveryManagerTest.java | 197 +++++++++++++++++++ 3 files changed, 286 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0212c06e/ambari-server/src/main/java/org/apache/ambari/server/StateRecoveryManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/StateRecoveryManager.java b/ambari-server/src/main/java/org/apache/ambari/server/StateRecoveryManager.java new file mode 100644 index 0000000..aa6f053 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/StateRecoveryManager.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server; + +import com.google.inject.Inject; +import org.apache.ambari.server.orm.dao.ClusterVersionDAO; +import org.apache.ambari.server.orm.dao.HostVersionDAO; +import org.apache.ambari.server.orm.entities.ClusterVersionEntity; +import org.apache.ambari.server.orm.entities.HostVersionEntity; +import org.apache.ambari.server.state.RepositoryVersionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Is executed on server start. + * Checks server state and recovers it to valid if required. + */ +public class StateRecoveryManager { + + private static Logger LOG = LoggerFactory.getLogger(StateRecoveryManager.class); + + @Inject + private HostVersionDAO hostVersionDAO; + + @Inject + private ClusterVersionDAO clusterVersionDAO; + + + public void doWork() { + checkHostAndClusterVersions(); + } + + + void checkHostAndClusterVersions() { + List<HostVersionEntity> hostVersions = hostVersionDAO.findAll(); + for (HostVersionEntity hostVersion : hostVersions) { + if (hostVersion.getState().equals(RepositoryVersionState.INSTALLING)) { + hostVersion.setState(RepositoryVersionState.INSTALL_FAILED); + String msg = String.format( + "Recovered state of host version %s on host %s from %s to %s", + hostVersion.getRepositoryVersion().getDisplayName(), + hostVersion.getHostName(), + RepositoryVersionState.INSTALLING, + RepositoryVersionState.INSTALL_FAILED); + LOG.warn(msg); + hostVersionDAO.merge(hostVersion); + } + } + + List<ClusterVersionEntity> clusterVersions = clusterVersionDAO.findAll(); + for (ClusterVersionEntity clusterVersion : clusterVersions) { + if (clusterVersion.getState().equals(RepositoryVersionState.INSTALLING)) { + clusterVersion.setState(RepositoryVersionState.INSTALL_FAILED); + String msg = String.format( + "Recovered state of cluster version %s for cluster %s from %s to %s", + clusterVersion.getRepositoryVersion().getDisplayName(), + clusterVersion.getClusterEntity().getClusterName(), + RepositoryVersionState.INSTALLING, + RepositoryVersionState.INSTALL_FAILED); + LOG.warn(msg); + clusterVersionDAO.merge(clusterVersion); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0212c06e/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index e430c98..94120de 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -31,6 +31,7 @@ import javax.servlet.DispatcherType; import org.apache.ambari.eventdb.webservice.WorkflowJsonService; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.StateRecoveryManager; import org.apache.ambari.server.StaticallyInject; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.actionmanager.HostRoleCommandFactory; @@ -484,8 +485,9 @@ public class AmbariServer { clusterController = controller; - // FIXME need to figure out correct order of starting things to - // handle restart-recovery correctly + StateRecoveryManager recoveryManager = injector.getInstance( + StateRecoveryManager.class); + recoveryManager.doWork(); /* * Start the server after controller state is recovered. http://git-wip-us.apache.org/repos/asf/ambari/blob/0212c06e/ambari-server/src/test/java/org/apache/ambari/server/StateRecoveryManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/StateRecoveryManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/StateRecoveryManagerTest.java new file mode 100644 index 0000000..0e9b18d --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/StateRecoveryManagerTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.persist.PersistService; +import com.google.inject.util.Modules; +import org.apache.ambari.server.orm.GuiceJpaInitializer; +import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.orm.dao.ClusterVersionDAO; +import org.apache.ambari.server.orm.dao.HostVersionDAO; +import org.apache.ambari.server.orm.entities.ClusterEntity; +import org.apache.ambari.server.orm.entities.ClusterVersionEntity; +import org.apache.ambari.server.orm.entities.HostVersionEntity; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.state.RepositoryVersionState; +import org.easymock.Capture; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.easymock.EasyMock.and; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.*; + +public class StateRecoveryManagerTest { + + private Injector injector; + private HostVersionDAO hostVersionDAOMock; + private ClusterVersionDAO clusterVersionDAOMock; + + @Before + public void setup() throws Exception { + // Create instances of mocks + clusterVersionDAOMock = createNiceMock(ClusterVersionDAO.class); + hostVersionDAOMock = createNiceMock(HostVersionDAO.class); + // Initialize injector + InMemoryDefaultTestModule module = new InMemoryDefaultTestModule(); + injector = Guice.createInjector(Modules.override(module).with(new MockModule())); + injector.getInstance(GuiceJpaInitializer.class); + } + + @After + public void teardown() { + injector.getInstance(PersistService.class).stop(); + } + + @Test + public void testCheckHostAndClusterVersions() throws Exception { + StateRecoveryManager stateRecoveryManager = injector.getInstance(StateRecoveryManager.class); + + // Adding all possible host version states + + final Capture<RepositoryVersionState> installFailedHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> installingHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> installedHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> outOfSyncHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradeFailedHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradingHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradedHostVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> currentHostVersionCapture = new Capture<RepositoryVersionState>(); + + expect(hostVersionDAOMock.findAll()).andReturn(new ArrayList<HostVersionEntity>() {{ + add(getHostVersionMock("install_failed_version", RepositoryVersionState.INSTALL_FAILED, installFailedHostVersionCapture)); + add(getHostVersionMock("installing_version", RepositoryVersionState.INSTALLING, installingHostVersionCapture)); + add(getHostVersionMock("installed_version", RepositoryVersionState.INSTALLED, installedHostVersionCapture)); + add(getHostVersionMock("out_of_sync_version", RepositoryVersionState.OUT_OF_SYNC, outOfSyncHostVersionCapture)); + add(getHostVersionMock("upgrade_failed_version", RepositoryVersionState.UPGRADE_FAILED, upgradeFailedHostVersionCapture)); + add(getHostVersionMock("upgrading_version", RepositoryVersionState.UPGRADING, upgradingHostVersionCapture)); + add(getHostVersionMock("upgraded_version", RepositoryVersionState.UPGRADED, upgradedHostVersionCapture)); + add(getHostVersionMock("current_version", RepositoryVersionState.CURRENT, currentHostVersionCapture)); + }}); + + // Adding all possible cluster version states + + final Capture<RepositoryVersionState> installFailedClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> installingClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> installedClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> outOfSyncClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradeFailedClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradingClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> upgradedClusterVersionCapture = new Capture<RepositoryVersionState>(); + final Capture<RepositoryVersionState> currentClusterVersionCapture = new Capture<RepositoryVersionState>(); + + expect(clusterVersionDAOMock.findAll()).andReturn(new ArrayList<ClusterVersionEntity>() {{ + add(getClusterVersionMock("install_failed_version", RepositoryVersionState.INSTALL_FAILED, installFailedClusterVersionCapture)); + add(getClusterVersionMock("installing_version", RepositoryVersionState.INSTALLING, installingClusterVersionCapture)); + add(getClusterVersionMock("installed_version", RepositoryVersionState.INSTALLED, installedClusterVersionCapture)); + add(getClusterVersionMock("out_of_sync_version", RepositoryVersionState.OUT_OF_SYNC, outOfSyncClusterVersionCapture)); + add(getClusterVersionMock("upgrade_failed_version", RepositoryVersionState.UPGRADE_FAILED, upgradeFailedClusterVersionCapture)); + add(getClusterVersionMock("upgrading_version", RepositoryVersionState.UPGRADING, upgradingClusterVersionCapture)); + add(getClusterVersionMock("upgraded_version", RepositoryVersionState.UPGRADED, upgradedClusterVersionCapture)); + add(getClusterVersionMock("current_version", RepositoryVersionState.CURRENT, currentClusterVersionCapture)); + }}); + + replay(hostVersionDAOMock, clusterVersionDAOMock); + + stateRecoveryManager.checkHostAndClusterVersions(); + + // Checking that only invalid host version states have been changed + assertFalse(installFailedHostVersionCapture.hasCaptured()); + assertEquals(installingHostVersionCapture.getValue(), RepositoryVersionState.INSTALL_FAILED); + assertFalse(installedHostVersionCapture.hasCaptured()); + assertFalse(outOfSyncHostVersionCapture.hasCaptured()); + assertFalse(upgradeFailedHostVersionCapture.hasCaptured()); + assertFalse(upgradingHostVersionCapture.hasCaptured()); + assertFalse(upgradedHostVersionCapture.hasCaptured()); + assertFalse(currentHostVersionCapture.hasCaptured()); + + // Checking that only invalid cluster version states have been changed + assertFalse(installFailedClusterVersionCapture.hasCaptured()); + assertEquals(installingClusterVersionCapture.getValue(), RepositoryVersionState.INSTALL_FAILED); + assertFalse(installedClusterVersionCapture.hasCaptured()); + assertFalse(outOfSyncClusterVersionCapture.hasCaptured()); + assertFalse(upgradeFailedClusterVersionCapture.hasCaptured()); + assertFalse(upgradingClusterVersionCapture.hasCaptured()); + assertFalse(upgradedClusterVersionCapture.hasCaptured()); + assertFalse(currentClusterVersionCapture.hasCaptured()); + } + + + private HostVersionEntity getHostVersionMock(String name, RepositoryVersionState state, + Capture<RepositoryVersionState> newStateCaptor) { + HostVersionEntity hvMock = createNiceMock(HostVersionEntity.class); + expect(hvMock.getState()).andReturn(state); + + hvMock.setState(capture(newStateCaptor)); + expectLastCall(); + + RepositoryVersionEntity rvMock = createNiceMock(RepositoryVersionEntity.class); + expect(rvMock.getDisplayName()).andReturn(name); + + expect(hvMock.getRepositoryVersion()).andReturn(rvMock); + expect(hvMock.getHostName()).andReturn("somehost"); + + replay(hvMock, rvMock); + + return hvMock; + } + + + private ClusterVersionEntity getClusterVersionMock(String name, RepositoryVersionState state, + Capture<RepositoryVersionState> newStateCaptor) { + ClusterVersionEntity cvMock = createNiceMock(ClusterVersionEntity.class); + expect(cvMock.getState()).andReturn(state); + + cvMock.setState(capture(newStateCaptor)); + expectLastCall(); + + RepositoryVersionEntity rvMock = createNiceMock(RepositoryVersionEntity.class); + expect(rvMock.getDisplayName()).andReturn(name); + + expect(cvMock.getRepositoryVersion()).andReturn(rvMock); + + ClusterEntity ceMock = createNiceMock(ClusterEntity.class); + expect(ceMock.getClusterName()).andReturn("somecluster"); + + expect(cvMock.getClusterEntity()).andReturn(ceMock); + + replay(cvMock, rvMock, ceMock); + + return cvMock; + } + + public class MockModule extends AbstractModule { + @Override + protected void configure() { + bind(HostVersionDAO.class).toInstance(hostVersionDAOMock); + bind(ClusterVersionDAO.class).toInstance(clusterVersionDAOMock); + } + } + +} \ No newline at end of file
