YARN-5006. ResourceManager quit due to ApplicationStateData exceed the limit size of znode in zk. Contributed by Bibin A Chundatt.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/740204b2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/740204b2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/740204b2 Branch: refs/heads/HADOOP-13345 Commit: 740204b2926f49ea70596c6059582ce409fbdd90 Parents: 092ebdf Author: Naganarasimha <[email protected]> Authored: Fri Jun 23 07:52:41 2017 +0530 Committer: Naganarasimha <[email protected]> Committed: Fri Jun 23 07:52:41 2017 +0530 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 5 +++ .../src/main/resources/yarn-default.xml | 8 +++++ .../resourcemanager/recovery/RMStateStore.java | 13 ++++--- .../recovery/StoreLimitException.java | 33 +++++++++++++++++ .../recovery/ZKRMStateStore.java | 16 +++++++-- .../resourcemanager/rmapp/RMAppEvent.java | 25 +++++++++++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 6 ++-- .../recovery/TestZKRMStateStore.java | 37 ++++++++++++++++++++ 8 files changed, 135 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5e4c826..ca71d35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -561,6 +561,11 @@ public class YarnConfiguration extends Configuration { public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries"; public static final int DEFAULT_ZK_RM_NUM_RETRIES = 1000; + /** Zookeeper znode limit */ + public static final String RM_ZK_ZNODE_SIZE_LIMIT_BYTES = + RM_ZK_PREFIX + "max-znode-size.bytes"; + public static final int DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES = 1024 * 1024; + public static final String RM_ZK_RETRY_INTERVAL_MS = RM_ZK_PREFIX + "retry-interval-ms"; public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000; http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e687eef..d4b7bde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -629,6 +629,14 @@ </property> <property> + <description>Specifies the maximum size of the data that can be stored + in a znode.Value should be same or less than jute.maxbuffer configured + in zookeeper.Default value configured is 1MB.</description> + <name>yarn.resourcemanager.zk-max-znode-size.bytes</name> + <value>1048576</value> + </property> + + <property> <description>Name of the cluster. In a HA setting, this is used to ensure the RM participates in leader election for this cluster and ensures it does not affect http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 975847c..e945b59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -217,14 +216,20 @@ public abstract class RMStateStore extends AbstractService { LOG.info("Storing info for app: " + appId); try { store.storeApplicationStateInternal(appId, appState); - store.notifyApplication(new RMAppEvent(appId, - RMAppEventType.APP_NEW_SAVED)); + store.notifyApplication( + new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); - isFenced = store.notifyStoreOperationFailedInternal(e); + if (e instanceof StoreLimitException) { + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_REJECTED, e.getMessage(), false)); + } else { + isFenced = store.notifyStoreOperationFailedInternal(e); + } } return finalState(isFenced); }; + } private static class UpdateAppTransition implements http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java new file mode 100644 index 0000000..289ea44 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreLimitException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * This exception is thrown when Application Data size exceeds limit RM state + * store. + * + */ +public class StoreLimitException extends YarnException { + private static final long serialVersionUID = 1L; + + public StoreLimitException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index a9d0885..3b986d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -188,6 +188,7 @@ public class ZKRMStateStore extends RMStateStore { private String fencingNodePath; private Thread verifyActiveStatusThread; private int zkSessionTimeout; + private int zknodeLimit; /* ACL and auth info */ private List<ACL> zkAcl; @@ -283,6 +284,8 @@ public class ZKRMStateStore extends RMStateStore { fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, + YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES); appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, @@ -746,8 +749,17 @@ public class ZKRMStateStore extends RMStateStore { } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - safeCreate(nodeCreatePath, appStateData, zkAcl, - CreateMode.PERSISTENT); + if (appStateData.length <= zknodeLimit) { + safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Application state data size for " + appId + " is " + + appStateData.length); + } + throw new StoreLimitException("Application " + appId + + " exceeds the maximum allowed size for application data. " + + "See yarn.resourcemanager.zk-max-znode-size.bytes."); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java index 6496402..0c6139e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEvent.java @@ -25,6 +25,7 @@ public class RMAppEvent extends AbstractEvent<RMAppEventType>{ private final ApplicationId appId; private final String diagnosticMsg; + private boolean storeAppInfo; public RMAppEvent(ApplicationId appId, RMAppEventType type) { this(appId, type, ""); @@ -35,6 +36,21 @@ public class RMAppEvent extends AbstractEvent<RMAppEventType>{ super(type); this.appId = appId; this.diagnosticMsg = diagnostic; + this.storeAppInfo = true; + } + + /** + * Constructor to create RM Application Event type. + * + * @param appId application Id + * @param type RM Event type + * @param diagnostic Diagnostic message for event + * @param storeApp Application should be saved or not + */ + public RMAppEvent(ApplicationId appId, RMAppEventType type, String diagnostic, + boolean storeApp) { + this(appId, type, diagnostic); + this.storeAppInfo = storeApp; } public ApplicationId getApplicationId() { @@ -44,4 +60,13 @@ public class RMAppEvent extends AbstractEvent<RMAppEventType>{ public String getDiagnosticMsg() { return this.diagnosticMsg; } + + /** + * Store application to state store or not. + * + * @return boolean application should be saved to store. + */ + public boolean doStoreAppInfo() { + return storeAppInfo; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index dda9474..bf8fa4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1307,8 +1307,10 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.rememberTargetTransitionsAndStoreState(event, transitionToDo, - targetedFinalState, stateToBeStored); + if (event.doStoreAppInfo()) { + app.rememberTargetTransitionsAndStoreState(event, transitionToDo, + targetedFinalState, stateToBeStored); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/740204b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 7c40ddf..fcd8647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; @@ -53,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMSta import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -70,6 +74,7 @@ import org.junit.Before; import org.junit.Test; import com.google.common.base.Joiner; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -249,6 +254,38 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { zkTester.getRMStateStore()).testRetryingCreateRootDir(); } + @Test + public void testZKNodeLimit() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES, 1); + RMStateStore store = zkTester.getRMStateStore(conf); + TestAppRejDispatcher dispatcher = new TestAppRejDispatcher(); + store.setRMDispatcher(dispatcher); + ApplicationId appId1 = + ApplicationId.fromString("application_1352994193343_0001"); + storeApp(store, appId1, submitTime, startTime); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return dispatcher.apprejectedEvnt; + } + }, 100, 5000); + } + + static class TestAppRejDispatcher extends TestDispatcher { + private boolean apprejectedEvnt; + + public void handle(Event event) { + if (event instanceof RMAppEvent + && event.getType().equals(RMAppEventType.APP_REJECTED)) { + apprejectedEvnt = true; + } + }; + } + @Test (timeout = 60000) public void testCheckMajorVersionChange() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
