Repository: samza
Updated Branches:
  refs/heads/master 888e06173 -> 101ca439b


SAMZA-1138; Yarn capability check is broken

After migration from `yarn.container.*` properties to 
`cluster-manager.container.*` properties we have to use either of them and 
`ClusterManagerConfig` provides backward compatibility for these properties. 
But in `YarnClusterResourceManage`r only old properties are used (from 
`YarnConfig`), hence if job config migrated to new `cluster-manager.*` 
properties names then check will be evaluated against default values, not 
against actual values.

Author: Maksim Logvinenko <mlogvine...@gmail.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #84 from logarithm/yarn-properties-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/101ca439
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/101ca439
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/101ca439

Branch: refs/heads/master
Commit: 101ca439b8b22926db7423309a3ff880f396271a
Parents: 888e061
Author: Maksim Logvinenko <mlogvine...@gmail.com>
Authored: Sun Apr 2 16:18:43 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Sun Apr 2 16:18:43 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/YarnConfig.java     | 41 --------------------
 .../job/yarn/YarnClusterResourceManager.java    | 10 ++++-
 2 files changed, 9 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 86e4ef7..aa4bc3e 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -27,41 +27,16 @@ public class YarnConfig extends MapConfig {
    */
   public static final String PACKAGE_PATH = "yarn.package.path";
 
-  // Configs related to each yarn container
-  /**
-   * Memory, in megabytes, to request from YARN per container
-   */
-  public static final String CONTAINER_MAX_MEMORY_MB = 
"yarn.container.memory.mb";
-  private static final int DEFAULT_CONTAINER_MEM = 1024;
-
   /**
    * Name of YARN queue to run jobs on
    */
   public static final String QUEUE_NAME = "yarn.queue";
 
   /**
-   * Number of CPU cores to request from YARN per container
-   */
-  public static final String CONTAINER_MAX_CPU_CORES = 
"yarn.container.cpu.cores";
-  private static final int DEFAULT_CPU_CORES = 1;
-
-  /**
    * Label to request from YARN for containers
    */
   public static final String CONTAINER_LABEL = "yarn.container.label";
 
-  /**
-   * Maximum number of times the AM tries to restart a failed container
-   */
-  public static final String CONTAINER_RETRY_COUNT = 
"yarn.container.retry.count";
-  private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
-
-  /**
-   * Determines how frequently a container is allowed to fail before we give 
up and fail the job
-   */
-  public static final String CONTAINER_RETRY_WINDOW_MS = 
"yarn.container.retry.window.ms";
-  private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000;
-
   // Configs related to the Samza Application Master (AM)
   /**
    * (Optional) JVM options to include in the command line when executing the 
AM
@@ -150,26 +125,10 @@ public class YarnConfig extends MapConfig {
     super(config);
   }
 
-  public int getContainerRetryCount() {
-    return getInt(CONTAINER_RETRY_COUNT, DEFAULT_CONTAINER_RETRY_COUNT);
-  }
-
-  public int getContainerRetryWindowMs() {
-    return getInt(CONTAINER_RETRY_WINDOW_MS, 
DEFAULT_CONTAINER_RETRY_WINDOW_MS);
-  }
-
   public int getAMPollIntervalMs() {
     return getInt(AM_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS);
   }
 
-  public int getContainerMaxMemoryMb() {
-    return getInt(CONTAINER_MAX_MEMORY_MB, DEFAULT_CONTAINER_MEM);
-  }
-
-  public int getContainerMaxCpuCores() {
-    return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
-  }
-
   public String getContainerLabel() {
     return get(CONTAINER_LABEL, null);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/101ca439/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 04c78be..1fd3939 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.clustermanager.*;
 import org.apache.samza.clustermanager.SamzaApplicationState;
 import org.apache.samza.clustermanager.SamzaContainerLaunchException;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
@@ -149,7 +150,14 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     this.service = new SamzaYarnAppMasterService(config, samzaAppState, 
this.state, registry, hConfig);
 
     log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport 
{}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort});
-    this.lifecycle = new 
SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), 
yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient );
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    this.lifecycle = new SamzaYarnAppMasterLifecycle(
+        clusterManagerConfig.getContainerMemoryMb(),
+        clusterManagerConfig.getNumCores(),
+        samzaAppState,
+        state,
+        amClient
+    );
 
     yarnContainerRunner = new YarnContainerRunner(config, hConfig);
   }

Reply via email to