Repository: ambari Updated Branches: refs/heads/branch-2.2 8abd9894f -> 650021ae7 refs/heads/trunk 18449b5b8 -> 825d557ae
AMBARI-15691. Express Upgrade hangs if ambari agent is restarted in the middle of EU. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/825d557a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/825d557a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/825d557a Branch: refs/heads/trunk Commit: 825d557ae63e5bb6a7d22e0132c96dc599d12ad0 Parents: 18449b5 Author: Myroslav Papirkovskyi <[email protected]> Authored: Mon Apr 4 21:28:56 2016 +0300 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Mon Apr 4 22:58:17 2016 +0300 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 13 ++-- .../actionmanager/TestActionScheduler.java | 75 ++++++++++---------- 2 files changed, 47 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/825d557a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 79e3a07..8346e18 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -882,10 +882,15 @@ class ActionScheduler implements Runnable { return false; } - protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) { - String hostName = (null == host) ? null : host.getHostName(); - long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role); - return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime(); + boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) { + if (host == null) { + // null host is valid in case of server action, skip restart detection + return false; + } else { + String hostName = host.getHostName(); + long lastStageAttemptTime = stage.getLastAttemptTime(hostName, role); + return lastStageAttemptTime > 0 && lastStageAttemptTime <= host.getLastRegistrationTime(); + } } private boolean hasCommandInProgress(Stage stage, String host) { http://git-wip-us.apache.org/repos/asf/ambari/blob/825d557a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 0ee0c27..9cecd98 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -113,6 +113,11 @@ public class TestActionScheduler { + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org]}"; + // HOST_LAST_REGISTRATION_TIME should be less than STAGE_LAST_ATTEMPT_TIME + // This means that there was not ambari-agent restart during stage execution + public static final long HOST_LAST_REGISTRATION_TIME = 99L; + public static final long STAGE_LAST_ATTEMPT_TIME = 100L; + private static Injector injector; private final String hostname = "ahost.ambari.apache.org"; @@ -959,10 +964,8 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf)); - - doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, null, conf); scheduler.doWork(); @@ -1049,12 +1052,9 @@ public class TestActionScheduler { Properties properties = new Properties(); properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false"); Configuration conf = new Configuration(properties); - ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf)); - - - doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); + unitOfWork, null, conf); scheduler.doWork(); @@ -1123,11 +1123,9 @@ public class TestActionScheduler { Properties properties = new Properties(); properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true"); Configuration conf = new Configuration(properties); - ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf)); - - doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); + unitOfWork, null, conf); scheduler.doWork(); @@ -2307,39 +2305,41 @@ public class TestActionScheduler { final List<Stage> stagesInProgress = new ArrayList<Stage>(); int namenodeCmdTaskId = 1; - stagesInProgress.add( - getStageWithSingleTask( - hostname1, "cluster1", Role.NAMENODE, RoleCommand.START, - Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1)); - stagesInProgress.add( - getStageWithSingleTask( - hostname1, "cluster1", Role.DATANODE, RoleCommand.START, - Service.Type.HDFS, 2, 2, (int) requestId1)); - stagesInProgress.add( - getStageWithSingleTask( - hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive - Service.Type.HDFS, 3, 3, (int) requestId2)); - - stagesInProgress.add( - getStageWithSingleTask( - hostname3, "cluster1", Role.DATANODE, RoleCommand.START, - Service.Type.HDFS, 4, 4, (int) requestId3)); + Stage stageInProgress1 = spy(getStageWithSingleTask( + hostname1, "cluster1", Role.NAMENODE, RoleCommand.START, + Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1)); + Stage stageInProgress2 = spy(getStageWithSingleTask( + hostname1, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, 2, 2, (int) requestId1)); + Stage stageInProgress3 = spy(getStageWithSingleTask( + hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive + Service.Type.HDFS, 3, 3, (int) requestId2)); + Stage stageInProgress4 = spy(getStageWithSingleTask( + hostname3, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, 4, 4, (int) requestId3)); + stagesInProgress.add(stageInProgress1); + stagesInProgress.add(stageInProgress2); + stagesInProgress.add(stageInProgress3); + stagesInProgress.add(stageInProgress4); Host host1 = mock(Host.class); when(fsm.getHost(anyString())).thenReturn(host1); when(host1.getState()).thenReturn(HostState.HEALTHY); when(host1.getHostName()).thenReturn(hostname); + when(host1.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME); Host host2 = mock(Host.class); when(fsm.getHost(anyString())).thenReturn(host2); when(host2.getState()).thenReturn(HostState.HEALTHY); when(host2.getHostName()).thenReturn(hostname); + when(host2.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME); Host host3 = mock(Host.class); when(fsm.getHost(anyString())).thenReturn(host3); when(host3.getState()).thenReturn(HostState.HEALTHY); when(host3.getHostName()).thenReturn(hostname); + when(host3.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME); ActionDBAccessor db = mock(ActionDBAccessor.class); when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size()); @@ -2415,10 +2415,13 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf)); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, null, conf); - doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); + doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress1).getLastAttemptTime(anyString(), anyString()); + doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress2).getLastAttemptTime(anyString(), anyString()); + doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress3).getLastAttemptTime(anyString(), anyString()); + doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress4).getLastAttemptTime(anyString(), anyString()); // Execution of request 1 @@ -2615,10 +2618,8 @@ public class TestActionScheduler { } }).when(db).abortOperation(anyLong()); - ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf)); - - doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), unitOfWork, null, conf); scheduler.doWork();
