YARN-3094. Reset timer for liveness monitors after RM recovery. Contributed by Jun Gong
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0af6a99a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0af6a99a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0af6a99a Branch: refs/heads/YARN-2928 Commit: 0af6a99a3fcfa4b47d3bcba5e5cc5fe7b312a152 Parents: 7d73202 Author: Jian He <[email protected]> Authored: Mon Feb 9 13:47:08 2015 -0800 Committer: Jian He <[email protected]> Committed: Mon Feb 9 13:47:08 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/util/AbstractLivelinessMonitor.java | 8 ++ .../server/resourcemanager/ResourceManager.java | 2 + .../rmapp/attempt/AMLivelinessMonitor.java | 6 ++ .../rmapp/attempt/TestAMLivelinessMonitor.java | 81 ++++++++++++++++++++ 5 files changed, 100 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cc2f34b..578a8cc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -516,6 +516,9 @@ Release 2.7.0 - UNRELEASED YARN-3143. RM Apps REST API can return NPE or entries missing id and other fields (jlowe) + YARN-3094. Reset timer for liveness monitors after RM recovery. (Jun Gong + via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index c182531..4f587b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -59,6 +59,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService { @Override protected void serviceStart() throws Exception { assert !stopped : "starting when already stopped"; + resetTimer(); checkerThread = new Thread(new PingChecker()); checkerThread.setName("Ping Checker"); checkerThread.start(); @@ -99,6 +100,13 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService { running.remove(ob); } + public synchronized void resetTimer() { + long time = clock.getTime(); + for (O ob : running.keySet()) { + running.put(ob, time); + } + } + private class PingChecker implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4f242e93..a93372a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -564,12 +564,14 @@ public class ResourceManager extends CompositeService implements Recoverable { if(recoveryEnabled) { try { + LOG.info("Recovery started"); rmStore.checkVersion(); if (rmContext.isWorkPreservingRecoveryEnabled()) { rmContext.setEpoch(rmStore.getAndIncrementEpoch()); } RMState state = rmStore.loadState(); recover(state); + LOG.info("Recovery ended"); } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java index 2c1f7f1..76331bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> { @@ -35,6 +36,11 @@ public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAt this.dispatcher = d.getEventHandler(); } + public AMLivelinessMonitor(Dispatcher d, Clock clock) { + super("AMLivelinessMonitor", clock); + this.dispatcher = d.getEventHandler(); + } + public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, http://git-wip-us.apache.org/repos/asf/hadoop/blob/0af6a99a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java new file mode 100644 index 0000000..e0e6aee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class TestAMLivelinessMonitor { + + @Test(timeout = 10000) + public void testResetTimer() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000); + final ControlledClock clock = new ControlledClock(new SystemClock()); + clock.setTime(0); + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized RMState loadState() throws Exception { + clock.setTime(8000); + return super.loadState(); + } + }; + memStore.init(conf); + final ApplicationAttemptId attemptId = mock(ApplicationAttemptId.class); + final Dispatcher dispatcher = mock(Dispatcher.class); + final boolean[] expired = new boolean[]{false}; + final AMLivelinessMonitor monitor = new AMLivelinessMonitor( + dispatcher, clock) { + @Override + protected void expire(ApplicationAttemptId id) { + Assert.assertEquals(id, attemptId); + expired[0] = true; + } + }; + monitor.register(attemptId); + MockRM rm = new MockRM(conf, memStore) { + @Override + protected AMLivelinessMonitor createAMLivelinessMonitor() { + return monitor; + } + }; + rm.start(); + // make sure that monitor has started + while (monitor.getServiceState() != Service.STATE.STARTED) { + Thread.sleep(100); + } + // expired[0] would be set to true without resetTimer + Assert.assertFalse(expired[0]); + rm.stop(); + } +}
