Repository: samza Updated Branches: refs/heads/master 888e06173 -> 101ca439b
SAMZA-1138; Yarn capability check is broken After migration from `yarn.container.*` properties to `cluster-manager.container.*` properties we have to use either of them and `ClusterManagerConfig` provides backward compatibility for these properties. But in `YarnClusterResourceManage`r only old properties are used (from `YarnConfig`), hence if job config migrated to new `cluster-manager.*` properties names then check will be evaluated against default values, not against actual values. Author: Maksim Logvinenko <mlogvine...@gmail.com> Reviewers: Jagadish <jagad...@apache.org> Closes #84 from logarithm/yarn-properties-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/101ca439 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/101ca439 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/101ca439 Branch: refs/heads/master Commit: 101ca439b8b22926db7423309a3ff880f396271a Parents: 888e061 Author: Maksim Logvinenko <mlogvine...@gmail.com> Authored: Sun Apr 2 16:18:43 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Sun Apr 2 16:18:43 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/YarnConfig.java | 41 -------------------- .../job/yarn/YarnClusterResourceManager.java | 10 ++++- 2 files changed, 9 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index 86e4ef7..aa4bc3e 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig { */ public static final String PACKAGE_PATH = "yarn.package.path"; - // Configs related to each yarn container - /** - * Memory, in megabytes, to request from YARN per container - */ - public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"; - private static final int DEFAULT_CONTAINER_MEM = 1024; - /** * Name of YARN queue to run jobs on */ public static final String QUEUE_NAME = "yarn.queue"; /** - * Number of CPU cores to request from YARN per container - */ - public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"; - private static final int DEFAULT_CPU_CORES = 1; - - /** * Label to request from YARN for containers */ public static final String CONTAINER_LABEL = "yarn.container.label"; - /** - * Maximum number of times the AM tries to restart a failed container - */ - public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count"; - private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8; - - /** - * Determines how frequently a container is allowed to fail before we give up and fail the job - */ - public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"; - private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000; - // Configs related to the Samza Application Master (AM) /** * (Optional) JVM options to include in the command line when executing the AM @@ -150,26 +125,10 @@ public class YarnConfig extends MapConfig { super(config); } - public int getContainerRetryCount() { - return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT); - } - - public int getContainerRetryWindowMs() { - return getInt(CONTAINER_RETRY_WINDOW_MS, DEFAULT_CONTAINER_RETRY_WINDOW_MS); - } - public int getAMPollIntervalMs() { return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS); } - public int getContainerMaxMemoryMb() { - return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM); - } - - public int getContainerMaxCpuCores() { - return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES); - } - public String getContainerLabel() { return get(CONTAINER_LABEL, null); } http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 04c78be..1fd3939 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -31,6 +31,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.clustermanager.*; import org.apache.samza.clustermanager.SamzaApplicationState; import org.apache.samza.clustermanager.SamzaContainerLaunchException; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.YarnConfig; @@ -149,7 +150,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig); log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort}); - this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient ); + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.lifecycle = new SamzaYarnAppMasterLifecycle( + clusterManagerConfig.getContainerMemoryMb(), + clusterManagerConfig.getNumCores(), + samzaAppState, + state, + amClient + ); yarnContainerRunner = new YarnContainerRunner(config, hConfig); }