SLIDER-930 Incorporate Yarn feature of resetting AM failure count into Slider AM (Sherry Guo via gourksaha)
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1a3fb79d Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1a3fb79d Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1a3fb79d Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: 1a3fb79d5a78ad772f11b4a121b66c22ab1ba855 Parents: a0d4f93 Author: Gour Saha <[email protected]> Authored: Mon Nov 16 20:27:08 2015 -0800 Committer: Gour Saha <[email protected]> Committed: Mon Nov 16 20:27:08 2015 -0800 ---------------------------------------------------------------------- .../org/apache/slider/api/ResourceKeys.java | 12 ++ .../apache/slider/core/conf/MapOperations.java | 15 +++ .../slider/core/launch/AbstractLauncher.java | 27 +++- .../slider/core/launch/AppMasterLauncher.java | 2 + .../standalone/TestStandaloneAMRestart.groovy | 122 +++++++++++++++++++ .../TestAppMasterLauncherWithAmReset.java | 92 ++++++++++++++ 6 files changed, 269 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java index f481c6a..f92a58d 100644 --- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java @@ -167,4 +167,16 @@ public interface ResourceKeys { */ String YARN_LOG_INCLUDE_PATTERNS = "yarn.log.include.patterns"; String YARN_LOG_EXCLUDE_PATTERNS = "yarn.log.exclude.patterns"; + + /** + * Window of time where application master's failure count + * can be reset to 0. + */ + String YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS = + "yarn.resourcemanager.am.retry-count-window-ms"; + + /** + * The default window for Slider. + */ + long DEFAULT_AM_RETRY_COUNT_WINDOW_MS = 300000; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java index 5f7b5f0..e58178c 100644 --- a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java +++ b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java @@ -138,6 +138,21 @@ public class MapOperations implements Map<String, String> { String val = getOption(option, Integer.toString(defVal)); return Integer.decode(val); } + + /** + * Get a long option; use {@link Long#decode(String)} so as to take hex + * oct and bin values too. + * + * @param option option name + * @param defVal default value + * @return parsed value + * @throws NumberFormatException + */ + public long getOptionLong(String option, long defVal) { + String val = getOption(option, Long.toString(defVal)); + return Long.decode(val); + } + /** * Get a mandatory integer option; use {@link Integer#decode(String)} so as to take hex * oct and bin values too. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java index 93aff08..22bf328 100644 --- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java +++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -279,6 +280,30 @@ public abstract class AbstractLauncher extends Configured { } } + /** + * Extract the value for option + * yarn.resourcemanager.am.retry-count-window-ms + * and set it on the ApplicationSubmissionContext. Use the default value + * if option is not set. + * + * @param submissionContext + * @param map + */ + public void extractAmRetryCount(ApplicationSubmissionContext submissionContext, + Map<String, String> map) { + + if (map != null) { + MapOperations options = new MapOperations("", map); + long amRetryCountWindow = options.getOptionLong(ResourceKeys + .YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS, + ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS); + log.info("Setting {} to {}", + ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS, + amRetryCountWindow); + submissionContext.setAttemptFailuresValidityInterval(amRetryCountWindow); + } + } + public void extractLogAggregationContext(Map<String, String> map) { if (map != null) { String logPatternSepStr = "\\|"; @@ -423,7 +448,7 @@ public abstract class AbstractLauncher extends Configured { } /** - * Suubmit an entire directory + * Submit an entire directory * @param srcDir src path in filesystem * @param destRelativeDir relative path under destination local dir * @throws IOException IO problems http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java index 06dbfea..c82affa 100644 --- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java +++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java @@ -106,6 +106,8 @@ public class AppMasterLauncher extends AbstractLauncher { submissionContext.setApplicationTags(applicationTags); } submissionContext.setNodeLabelExpression(extractLabelExpression(options)); + + extractAmRetryCount(submissionContext, resourceGlobalOptions); extractResourceRequirements(resource, options); extractLogAggregationContext(resourceGlobalOptions); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy index bdcf615..6947156 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy @@ -20,10 +20,13 @@ package org.apache.slider.agent.standalone import groovy.transform.CompileStatic import groovy.util.logging.Slf4j + import org.apache.hadoop.yarn.api.records.ApplicationReport import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.slider.agent.AgentMiniClusterTestBase +import org.apache.slider.api.ResourceKeys import org.apache.slider.client.SliderClient import org.apache.slider.common.SliderXmlConfKeys import org.apache.slider.common.params.ActionAMSuicideArgs @@ -94,6 +97,125 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase { assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) } + + @Test + public void testStandaloneAMRestartWithRetryWindow() throws Throwable { + describe "kill a Standalone AM and verify that the AM failure count " + + "is reset after the AM retry-count-window elapses" + // patch the configuration for AM restart + YarnConfiguration conf = getRestartableConfiguration(5) + + int restartLimit = 3; + int amRetryWindow = 60000; + String amRetryWindowStr = amRetryWindow.toString() + String clustername = createMiniCluster("", conf, 1, true) + ServiceLauncher<SliderClient> launcher = + createStandaloneAMWithArgs(clustername, + [ + Arguments.ARG_DEFINE, + SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit, + Arguments.ARG_RESOURCE_OPT, + ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS, + amRetryWindowStr + ], + true, + false) + SliderClient sliderClient = launcher.service + addToTeardown(sliderClient); + + ApplicationReport report = waitForClusterLive(sliderClient) + logReport(report) + waitUntilClusterLive(sliderClient, 30000) + + def diagnosticArgs = new ActionDiagnosticArgs() + diagnosticArgs.client = true + diagnosticArgs.yarn = true + sliderClient.actionDiagnostic(diagnosticArgs) + + describe "kill AM #1" + int iteration = 1; + killAMAndWaitForRestart(sliderClient, iteration, clustername) + + describe "kill AM #2" + killAMAndWaitForRestart(sliderClient, iteration++, clustername) + + // app should be running here + assert 0 == sliderClient.actionExists(clustername, true) + + // make sure the am reset window has elapsed + describe "sleeping to ensure reset window elapsed" + sleep (amRetryWindow) + + // kill again & expect the app to still be running + describe "kill AM #3 after window elapsed" + killAMAndWaitForRestart(sliderClient, iteration++, clustername) + assert 0 == sliderClient.actionExists(clustername, true) + + report = sliderClient.applicationReport + assert report.getYarnApplicationState() == YarnApplicationState.RUNNING + + logReport(report) + describe("stopping the cluster") + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + + report = sliderClient.applicationReport + assert report.finalApplicationStatus == FinalApplicationStatus.KILLED + } + + + @Test + public void testStandaloneAMRestartWithDefaultRetryWindow() throws Throwable { + describe "kill AM more than the max limit allowed within the AM " + + "retry-count-window and expect the app to fail" + // patch the configuration for AM restart + YarnConfiguration conf = getRestartableConfiguration(5) + + int restartLimit = 3; + String clustername = createMiniCluster("", conf, 1, true) + ServiceLauncher<SliderClient> launcher = + createStandaloneAMWithArgs(clustername, + [ + Arguments.ARG_DEFINE, + SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit, + ], + true, + false) + SliderClient sliderClient = launcher.service + addToTeardown(sliderClient); + + ApplicationReport report = waitForClusterLive(sliderClient) + logReport(report) + waitUntilClusterLive(sliderClient, 30000) + + def diagnosticArgs = new ActionDiagnosticArgs() + diagnosticArgs.client = true + diagnosticArgs.yarn = true + sliderClient.actionDiagnostic(diagnosticArgs) + + describe "kill AM #1" + int iteration = 1; + killAMAndWaitForRestart(sliderClient, iteration, clustername) + + describe "kill AM #2" + killAMAndWaitForRestart(sliderClient, iteration++, clustername) + + // app should be running here + assert 0 == sliderClient.actionExists(clustername, true) + + // kill again & expect the app to fail + describe "kill AM #3" + killAmAndWaitForDeath(sliderClient, iteration++, clustername) + sleep(40000) + + report = sliderClient.applicationReport + assert report.finalApplicationStatus == FinalApplicationStatus.FAILED + + logReport(report) + describe("stopping the cluster") + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + } + + /** * Kill an AM. take an iteration count for the message sent to the * AM (hence its logs) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java new file mode 100644 index 0000000..cc64cab --- /dev/null +++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.launch; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.util.Records; +import org.apache.slider.api.ResourceKeys; +import org.apache.slider.client.SliderYarnClientImpl; +import org.apache.slider.common.SliderKeys; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestAppMasterLauncherWithAmReset { + SliderYarnClientImpl mockYarnClient; + YarnClientApplication yarnClientApp; + ApplicationSubmissionContext appSubmissionContext; + GetNewApplicationResponse newApp; + Set<String> tags = Collections.emptySet(); + AppMasterLauncher appMasterLauncher = null; + boolean isOldApi = true; + + @Before + public void initialize() throws Exception { + mockYarnClient = EasyMock.createNiceMock(SliderYarnClientImpl.class); + yarnClientApp = EasyMock.createNiceMock(YarnClientApplication.class); + newApp = EasyMock.createNiceMock(GetNewApplicationResponse.class); + EasyMock.expect(mockYarnClient.createApplication()) + .andReturn(new YarnClientApplication(newApp, + Records.newRecord(ApplicationSubmissionContext.class))); + } + + @Test + public void testExtractYarnResourceManagerAmRetryCountWindowMs() throws + Exception { + Map<String, String> options = new HashMap<String, String>(); + final String expectedInterval = Integer.toString (120000); + options.put(ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS, + expectedInterval); + EasyMock.replay(mockYarnClient, yarnClientApp); + + appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null, + null, mockYarnClient, false, null, options, tags); + + ApplicationSubmissionContext ctx = appMasterLauncher.application + .getApplicationSubmissionContext(); + String retryIntervalWindow = Long.toString(ctx + .getAttemptFailuresValidityInterval()); + Assert.assertEquals(expectedInterval, retryIntervalWindow); + } + + @Test + public void testExtractYarnResourceManagerAmRetryCountWindowMsDefaultValue() + throws Exception { + Map<String, String> options = new HashMap<String, String>(); + EasyMock.replay(mockYarnClient, yarnClientApp); + + appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null, + null, mockYarnClient, false, null, options, tags); + + ApplicationSubmissionContext ctx = appMasterLauncher.application + .getApplicationSubmissionContext(); + long retryIntervalWindow = ctx.getAttemptFailuresValidityInterval(); + Assert.assertEquals(ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS, + retryIntervalWindow); + } + +}
