Repository: twill
Updated Branches:
  refs/heads/master dbbc2a349 -> 4356c283e


(TWILL-241) Added support for per Runnable configuration

Signed-off-by: Terence Yim <cht...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/2910b180
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/2910b180
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/2910b180

Branch: refs/heads/master
Commit: 2910b1803aa1f43c6fd57fdafa51612baf360e5d
Parents: dbbc2a3
Author: Terence Yim <cht...@apache.org>
Authored: Fri Aug 4 15:29:05 2017 -0700
Committer: Terence Yim <cht...@apache.org>
Committed: Mon Aug 7 23:29:40 2017 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillPreparer.java     | 11 +++
 .../org/apache/twill/api/TwillRunResources.java |  5 ++
 .../internal/DefaultTwillRunResources.java      | 13 +++-
 .../twill/internal/TwillContainerLauncher.java  | 19 ++++-
 .../internal/TwillRuntimeSpecification.java     | 77 ++++++++++++++++----
 .../internal/json/TwillRunResourcesCodec.java   | 20 +++--
 .../json/TwillRuntimeSpecificationCodec.java    | 29 +++++---
 .../appmaster/ApplicationMasterService.java     | 17 +++--
 .../internal/appmaster/RunningContainers.java   |  6 +-
 .../apache/twill/yarn/YarnTwillPreparer.java    | 57 +++++++--------
 .../apache/twill/yarn/ContainerSizeTestRun.java | 27 ++++++-
 11 files changed, 199 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java 
b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index 1f50972..35930d2 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -39,6 +39,17 @@ public interface TwillPreparer {
   TwillPreparer withConfiguration(Map<String, String> config);
 
   /**
+   * Overrides the default configuration with the given set of configurations 
for the given runnable only.
+   * This is useful to override configurations that affects runnables, such as
+   * {@link Configs.Keys#JAVA_RESERVED_MEMORY_MB} and {@link 
Configs.Keys#HEAP_RESERVED_MIN_RATIO}.
+   *
+   * @param runnableName Name of the {@link TwillRunnable}.
+   * @param config set of configurations to override
+   * @return This {@link TwillPreparer}
+   */
+  TwillPreparer withConfiguration(String runnableName, Map<String, String> 
config);
+
+  /**
    * Adds a {@link LogHandler} for receiving an application log.
    * @param handler The {@link LogHandler}.
    * @return This {@link TwillPreparer}.

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
----------------------------------------------------------------------
diff --git 
a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java 
b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
index f721f47..287b901 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java
@@ -44,6 +44,11 @@ public interface TwillRunResources {
   int getMemoryMB();
 
   /**
+   * @return the maximum amount of memory in MB of Java process heap memory.
+   */
+  int getMaxHeapMemoryMB();
+
+  /**
    * @return the host the runnable is running on.
    */
   String getHost();

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
----------------------------------------------------------------------
diff --git 
a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
 
b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
index 83b973a..f05074e 100644
--- 
a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
+++ 
b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java
@@ -35,6 +35,7 @@ public class DefaultTwillRunResources implements 
TwillRunResources {
   private final int instanceId;
   private final int virtualCores;
   private final int memoryMB;
+  private final int maxHeapMemoryMB;
   private final String host;
   private final Integer debugPort;
   private final Map<String, LogEntry.Level> logLevels;
@@ -42,17 +43,18 @@ public class DefaultTwillRunResources implements 
TwillRunResources {
   /**
    * Constructor to create an instance of {@link DefaultTwillRunResources} 
with empty log levels.
    */
-  public DefaultTwillRunResources(int instanceId, String containerId, int 
cores, int memoryMB,
+  public DefaultTwillRunResources(int instanceId, String containerId, int 
cores, int memoryMB, int maxHeapMemoryMB,
                                   String host, Integer debugPort) {
-    this(instanceId, containerId, cores, memoryMB, host, debugPort, 
Collections.<String, Level>emptyMap());
+    this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, 
debugPort, Collections.<String, Level>emptyMap());
   }
 
-  public DefaultTwillRunResources(int instanceId, String containerId, int 
cores, int memoryMB,
+  public DefaultTwillRunResources(int instanceId, String containerId, int 
cores, int memoryMB, int maxHeapMemoryMB,
                                   String host, Integer debugPort, Map<String, 
LogEntry.Level> logLevels) {
     this.instanceId = instanceId;
     this.containerId = containerId;
     this.virtualCores = cores;
     this.memoryMB = memoryMB;
+    this.maxHeapMemoryMB = maxHeapMemoryMB;
     this.host = host;
     this.debugPort = debugPort;
     this.logLevels = new HashMap<>(logLevels);
@@ -91,6 +93,11 @@ public class DefaultTwillRunResources implements 
TwillRunResources {
     return memoryMB;
   }
 
+  @Override
+  public int getMaxHeapMemoryMB() {
+    return maxHeapMemoryMB;
+  }
+
   /**
    * @return the host the runnable is running on.
    */

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0f8674b..700c0f1 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -61,6 +61,7 @@ public final class TwillContainerLauncher {
   private final int reservedMemory;
   private final double minHeapRatio;
   private final Location secureStoreLocation;
+  private int maxHeapSizeMB;
 
   public TwillContainerLauncher(RuntimeSpecification runtimeSpec, 
ContainerInfo containerInfo,
                                 ProcessLauncher.PrepareLaunchContext 
launchContext,
@@ -144,13 +145,12 @@ public final class TwillContainerLauncher {
       firstCommand = "$JAVA_HOME/bin/java";
     }
 
-    int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), 
reservedMemory,
-            minHeapRatio);
+    maxHeapSizeMB = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), 
reservedMemory, minHeapRatio);
     commandBuilder.add("-Djava.io.tmpdir=tmp",
                        "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
                        "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" 
+ EnvKeys.TWILL_RUNNABLE_NAME,
                        "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
-                       "-Xmx" + memory + "m");
+                       "-Xmx" + maxHeapSizeMB + "m");
     if (jvmOpts.getExtraOptions() != null) {
       commandBuilder.add(jvmOpts.getExtraOptions());
     }
@@ -169,6 +169,19 @@ public final class TwillContainerLauncher {
     return controller;
   }
 
+  /**
+   * Returns the maximum heap memory size in MB of the Java process launched 
in the container.
+   * This method can only be called after the {@link #start(RunId, int, Class, 
String, Location)} method.
+   *
+   * @throws IllegalStateException if the {@link #start(RunId, int, Class, 
String, Location)} was not called yet.
+   */
+  public int getMaxHeapMemoryMB() {
+    if (maxHeapSizeMB <= 0) {
+      throw new IllegalStateException("Unknown maximum heap memory size. 
Please make sure the container is started");
+    }
+    return maxHeapSizeMB;
+  }
+
   private static final class TwillContainerControllerImpl extends 
AbstractZKServiceController
                                                           implements 
TwillContainerController {
 

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
index 831c831..636d94d 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.twill.internal;
 
+import org.apache.twill.api.Configs;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillSpecification;
@@ -37,30 +38,28 @@ public class TwillRuntimeSpecification {
   private final String zkConnectStr;
   private final RunId twillRunId;
   private final String twillAppName;
-  private final int reservedMemory;
   private final String rmSchedulerAddr;
   private final Map<String, Map<String, String>> logLevels;
   private final Map<String, Integer> maxRetries;
-  private final double minHeapRatio;
-  private final boolean logCollectionEnabled;
+  private final Map<String, String> config;
+  private final Map<String, Map<String, String>> runnableConfigs;
 
   public TwillRuntimeSpecification(TwillSpecification twillSpecification, 
String fsUser, URI twillAppDir,
                                    String zkConnectStr, RunId twillRunId, 
String twillAppName,
-                                   int reservedMemory, @Nullable String 
rmSchedulerAddr,
-                                   Map<String, Map<String, String>> logLevels, 
Map<String, Integer> maxRetries,
-                                   double minHeapRatio, boolean 
logCollectionEnabled) {
+                                   @Nullable String rmSchedulerAddr, 
Map<String, Map<String, String>> logLevels,
+                                   Map<String, Integer> maxRetries, 
Map<String, String> config,
+                                   Map<String, Map<String, String>> 
runnableConfigs) {
     this.twillSpecification = twillSpecification;
     this.fsUser = fsUser;
     this.twillAppDir = twillAppDir;
     this.zkConnectStr = zkConnectStr;
     this.twillRunId = twillRunId;
     this.twillAppName = twillAppName;
-    this.reservedMemory = reservedMemory;
     this.rmSchedulerAddr = rmSchedulerAddr;
     this.logLevels = logLevels;
     this.maxRetries = maxRetries;
-    this.minHeapRatio = minHeapRatio;
-    this.logCollectionEnabled = logCollectionEnabled;
+    this.config = config;
+    this.runnableConfigs = runnableConfigs;
   }
 
   public TwillSpecification getTwillSpecification() {
@@ -87,19 +86,46 @@ public class TwillRuntimeSpecification {
     return twillAppName;
   }
 
-  public int getReservedMemory() {
-    return reservedMemory;
+  /**
+   * Returns the minimum heap ratio for the application master.
+   */
+  public double getAMMinHeapRatio() {
+    return getMinHeapRatio(config);
+  }
+
+  /**
+   * Returns the minimum heap ratio for the given runnable.
+   */
+  public double getMinHeapRatio(String runnableName) {
+    return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? 
runnableConfigs.get(runnableName) : config);
   }
 
-  public double getMinHeapRatio() {
-    return minHeapRatio;
+  /**
+   * Returns the reserved non-heap memory size in MB for the application 
master.
+   */
+  public int getAMReservedMemory() {
+    return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ?
+      Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) :
+      Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB;
+  }
+
+  /**
+   * Returns the reserved non-heap memory size in MB for the given runnable.
+   */
+  public int getReservedMemory(String runnableName) {
+    Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? 
runnableConfigs.get(runnableName) : config;
+    return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ?
+      Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) :
+      Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
   }
 
   /**
    * Returns whether log collection is enabled.
    */
   public boolean isLogCollectionEnabled() {
-    return logCollectionEnabled;
+    return config.containsKey(Configs.Keys.LOG_COLLECTION_ENABLED) ?
+      Boolean.parseBoolean(config.get(Configs.Keys.LOG_COLLECTION_ENABLED)) :
+      Configs.Defaults.LOG_COLLECTION_ENABLED;
   }
 
   @Nullable
@@ -116,6 +142,20 @@ public class TwillRuntimeSpecification {
   }
 
   /**
+   * Returns the configuration for the application.
+   */
+  public Map<String, String> getConfig() {
+    return config;
+  }
+
+  /**
+   * Returns the configurations for each runnable.
+   */
+  public Map<String, Map<String, String>> getRunnableConfigs() {
+    return runnableConfigs;
+  }
+
+  /**
    * Returns the ZK connection string for the Kafka used for log collections,
    * or {@code null} if log collection is disabled.
    */
@@ -127,4 +167,13 @@ public class TwillRuntimeSpecification {
     // When addressing TWILL-147, a field can be introduced to carry this 
value.
     return String.format("%s/%s/%s/kafka", getZkConnectStr(), 
getTwillAppName(), getTwillAppRunId());
   }
+
+  /**
+   * Returns the minimum heap ratio ({@link 
Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration.
+   */
+  private double getMinHeapRatio(Map<String, String> config) {
+    return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ?
+      Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) :
+      Configs.Defaults.HEAP_RESERVED_MIN_RATIO;
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index bb4d435..c9196c4 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -41,6 +41,7 @@ public final class TwillRunResourcesCodec implements 
JsonSerializer<TwillRunReso
   private static final String INSTANCE_ID = "instanceId";
   private static final String HOST = "host";
   private static final String MEMORY_MB = "memoryMB";
+  private static final String MAX_HEAP_MEMORY_MB = "maxHeapMemoryMB";
   private static final String VIRTUAL_CORES = "virtualCores";
   private static final String DEBUG_PORT = "debugPort";
   private static final String LOG_LEVELS = "logLevels";
@@ -53,6 +54,7 @@ public final class TwillRunResourcesCodec implements 
JsonSerializer<TwillRunReso
     json.addProperty(INSTANCE_ID, src.getInstanceId());
     json.addProperty(HOST, src.getHost());
     json.addProperty(MEMORY_MB, src.getMemoryMB());
+    json.addProperty(MAX_HEAP_MEMORY_MB, src.getMaxHeapMemoryMB());
     json.addProperty(VIRTUAL_CORES, src.getVirtualCores());
     if (src.getDebugPort() != null) {
       json.addProperty(DEBUG_PORT, src.getDebugPort());
@@ -69,12 +71,16 @@ public final class TwillRunResourcesCodec implements 
JsonSerializer<TwillRunReso
     JsonObject jsonObj = json.getAsJsonObject();
     Map<String, LogEntry.Level> logLevels =
       context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, 
LogEntry.Level>>() { }.getType());
-    return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(),
-                                        
jsonObj.get(CONTAINER_ID).getAsString(),
-                                        jsonObj.get(VIRTUAL_CORES).getAsInt(),
-                                        jsonObj.get(MEMORY_MB).getAsInt(),
-                                        jsonObj.get(HOST).getAsString(),
-                                        jsonObj.has(DEBUG_PORT) ? 
jsonObj.get(DEBUG_PORT).getAsInt() : null,
-                                        logLevels);
+    int memoryMB = jsonObj.get(MEMORY_MB).getAsInt();
+    return new DefaultTwillRunResources(
+      jsonObj.get(INSTANCE_ID).getAsInt(),
+      jsonObj.get(CONTAINER_ID).getAsString(),
+      jsonObj.get(VIRTUAL_CORES).getAsInt(),
+      memoryMB,
+      // For backward compatibility when a newer Twill client re-attached to 
running app started with older version.
+      jsonObj.has(MAX_HEAP_MEMORY_MB) ? 
jsonObj.get(MAX_HEAP_MEMORY_MB).getAsInt() : memoryMB,
+      jsonObj.get(HOST).getAsString(),
+      jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null,
+      logLevels);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
index 5ff05e8..710a9f7 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -39,18 +39,20 @@ import java.util.Map;
 final class TwillRuntimeSpecificationCodec implements 
JsonSerializer<TwillRuntimeSpecification>,
                                                          
JsonDeserializer<TwillRuntimeSpecification> {
 
+  private static final Type MAP_STRING_MAP_STRING_STRING_TYPE =
+    new TypeToken<Map<String, Map<String, String>>>() { }.getType();
+
   private static final String FS_USER = "fsUser";
   private static final String TWILL_APP_DIR = "twillAppDir";
   private static final String ZK_CONNECT_STR = "zkConnectStr";
   private static final String TWILL_RUNID = "twillRunId";
   private static final String TWILL_APP_NAME = "twillAppName";
-  private static final String RESERVED_MEMORY = "reservedMemory";
-  private static final String HEAP_RESERVED_MIN_RATIO = "minHeapRatio";
   private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
   private static final String TWILL_SPEC = "twillSpecification";
   private static final String LOG_LEVELS = "logLevels";
   private static final String MAX_RETRIES = "maxRetries";
-  private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled";
+  private static final String CONFIG = "config";
+  private static final String RUNNABLE_CONFIGS = "runnableConfigs";
 
   @Override
   public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, 
JsonSerializationContext context) {
@@ -60,19 +62,19 @@ final class TwillRuntimeSpecificationCodec implements 
JsonSerializer<TwillRuntim
     json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
     json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId());
     json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
-    json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
-    json.addProperty(HEAP_RESERVED_MIN_RATIO, src.getMinHeapRatio());
     if (src.getRmSchedulerAddr() != null) {
       json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
     }
     json.add(TWILL_SPEC,
              context.serialize(src.getTwillSpecification(), new 
TypeToken<TwillSpecification>() { }.getType()));
     json.add(LOG_LEVELS,
-             context.serialize(src.getLogLevels(), new TypeToken<Map<String, 
Map<String, String>>>() { }.getType()));
+             context.serialize(src.getLogLevels(), 
MAP_STRING_MAP_STRING_STRING_TYPE));
     json.add(MAX_RETRIES,
              context.serialize(src.getMaxRetries(), new TypeToken<Map<String, 
Integer>>() { }.getType()));
-    json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled());
-
+    json.add(CONFIG,
+             context.serialize(src.getConfig(), new TypeToken<Map<String, 
String>>() { }.getType()));
+    json.add(RUNNABLE_CONFIGS,
+             context.serialize(src.getRunnableConfigs(), 
MAP_STRING_MAP_STRING_STRING_TYPE));
     return json;
   }
 
@@ -84,9 +86,13 @@ final class TwillRuntimeSpecificationCodec implements 
JsonSerializer<TwillRuntim
     TwillSpecification twillSpecification = context.deserialize(
       jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { 
}.getType());
     Map<String, Map<String, String>> logLevels =
-      context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, 
Map<String, String>>>() { }.getType());
+      context.deserialize(jsonObj.get(LOG_LEVELS), 
MAP_STRING_MAP_STRING_STRING_TYPE);
     Map<String, Integer> maxRetries = 
       context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken<Map<String, 
Integer>>() { }.getType());
+    Map<String, String> config =
+      context.deserialize(jsonObj.get(CONFIG), new TypeToken<Map<String, 
String>>() { }.getType());
+    Map<String, Map<String, String>> runnableConfigs =
+      context.deserialize(jsonObj.get(RUNNABLE_CONFIGS), 
MAP_STRING_MAP_STRING_STRING_TYPE);
     
     return new TwillRuntimeSpecification(twillSpecification,
                                          jsonObj.get(FS_USER).getAsString(),
@@ -94,12 +100,11 @@ final class TwillRuntimeSpecificationCodec implements 
JsonSerializer<TwillRuntim
                                          
jsonObj.get(ZK_CONNECT_STR).getAsString(),
                                          
RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()),
                                          
jsonObj.get(TWILL_APP_NAME).getAsString(),
-                                         
jsonObj.get(RESERVED_MEMORY).getAsInt(),
                                          jsonObj.has(RM_SCHEDULER_ADDR) ?
                                          
jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
                                          logLevels,
                                          maxRetries,
-                                         
jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(),
-                                         
jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean());
+                                         config,
+                                         runnableConfigs);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 523ffce..a2ebf7b 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -71,6 +71,7 @@ import 
org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
 import org.apache.twill.internal.yarn.YarnAMClient;
 import org.apache.twill.internal.yarn.YarnContainerInfo;
@@ -127,8 +128,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   private final ExpectedContainers expectedContainers;
   private final YarnAMClient amClient;
   private final JvmOptions jvmOpts;
-  private final int reservedMemory;
-  private final double minHeapRatio;
   private final EventHandler eventHandler;
   private final Location applicationLocation;
   private final PlacementPolicyManager placementPolicyManager;
@@ -151,8 +150,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     this.amClient = amClient;
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
-    this.reservedMemory = twillRuntimeSpec.getReservedMemory();
-    this.minHeapRatio = twillRuntimeSpec.getMinHeapRatio();
     this.twillSpec = twillRuntimeSpec.getTwillSpecification();
     this.placementPolicyManager = new 
PlacementPolicyManager(twillSpec.getPlacementPolicies());
     this.environments = getEnvironments();
@@ -198,11 +195,18 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
   private RunningContainers createRunningContainers(ContainerId 
appMasterContainerId,
                                                     String appMasterHost) 
throws Exception {
+    int containerMemoryMB = 
Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
+
+    // We can't get the -Xmx easily, so just recompute the -Xmx in the same 
way that the client does
+    int maxHeapMemoryMB = Resources.computeMaxHeapSize(containerMemoryMB,
+                                                       
twillRuntimeSpec.getAMReservedMemory(),
+                                                       
twillRuntimeSpec.getAMMinHeapRatio());
     TwillRunResources appMasterResources = new DefaultTwillRunResources(
       0,
       appMasterContainerId.toString(),
       Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
-      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
+      containerMemoryMB,
+      maxHeapMemoryMB,
       appMasterHost, null);
     String appId = 
appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
     return new RunningContainers(appId, appMasterResources, zkClient, 
applicationLocation,
@@ -667,7 +671,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       TwillContainerLauncher launcher = new TwillContainerLauncher(
         twillSpec.getRunnables().get(runnableName), 
processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
-        containerCount, jvmOpts, reservedMemory, getSecureStoreLocation(), 
minHeapRatio);
+        containerCount, jvmOpts, 
twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(),
+        twillRuntimeSpec.getMinHeapRatio(runnableName));
 
       runningContainers.start(runnableName, 
processLauncher.getContainerInfo(), launcher);
 

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index a950c46..c85c372 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -163,9 +163,9 @@ final class RunningContainers {
                                                                  
containerInfo.getId(),
                                                                  
containerInfo.getVirtualCores(),
                                                                  
containerInfo.getMemoryMB(),
+                                                                 
launcher.getMaxHeapMemoryMB(),
                                                                  
containerInfo.getHost().getHostName(),
                                                                  controller);
-
       resourceReport.addRunResources(runnableName, resources);
       containerStats.put(runnableName, containerInfo);
 
@@ -734,9 +734,9 @@ final class RunningContainers {
     private Integer dynamicDebugPort = null;
 
     private DynamicTwillRunResources(int instanceId, String containerId,
-                                     int cores, int memoryMB, String host,
+                                     int cores, int memoryMB, int 
maxHeapMemoryMB, String host,
                                      TwillContainerController controller) {
-      super(instanceId, containerId, cores, memoryMB, host, null);
+      super(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, 
null);
       this.controller = controller;
     }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 52e18eb..5442fa0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -147,6 +147,7 @@ final class YarnTwillPreparer implements TwillPreparer {
   private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
   private final LocationCache locationCache;
   private final Map<String, Integer> maxRetries = Maps.newHashMap();
+  private final Map<String, Map<String, String>> runnableConfigs = 
Maps.newHashMap();
   private String schedulerQueue;
   private String extraOptions;
   private JvmOptions.DebugOptions debugOptions = 
JvmOptions.DebugOptions.NO_DEBUG;
@@ -183,6 +184,13 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   @Override
+  public TwillPreparer withConfiguration(String runnableName, Map<String, 
String> config) {
+    confirmRunnableName(runnableName);
+    runnableConfigs.put(runnableName, Maps.newHashMap(config));
+    return this;
+  }
+
+  @Override
   public TwillPreparer addLogHandler(LogHandler handler) {
     logHandlers.add(handler);
     return this;
@@ -382,10 +390,11 @@ final class YarnTwillPreparer implements TwillPreparer {
             createApplicationJar(createBundler(classAcceptor), localFiles);
             createResourcesJar(createBundler(classAcceptor), localFiles);
 
+            TwillRuntimeSpecification twillRuntimeSpec;
             Path runtimeConfigDir = 
Files.createTempDirectory(getLocalStagingDir().toPath(),
                                                               
Constants.Files.RUNTIME_CONFIG_JAR);
             try {
-              saveSpecification(twillSpec, 
runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
+              twillRuntimeSpec = saveSpecification(twillSpec, 
runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
               
saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
               saveClassPaths(runtimeConfigDir);
               saveJvmOptions(extraOptions, debugOptions, 
runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
@@ -405,10 +414,9 @@ final class YarnTwillPreparer implements TwillPreparer {
             //     appMaster.jar
             //     org.apache.twill.internal.appmaster.ApplicationMasterMain
             //     false
-
-            int reservedMemoryMB = 
config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB,
-                                                 
Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB);
-            int memory = 
Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, 
getMinHeapRatio());
+            int memory = 
Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+                                                      
twillRuntimeSpec.getAMReservedMemory(),
+                                                      
twillRuntimeSpec.getAMMinHeapRatio());
             return launcher.prepareLaunch(ImmutableMap.<String, String>of(), 
localFiles.values(),
                                           createSubmissionCredentials())
               .addCommand(
@@ -439,22 +447,6 @@ final class YarnTwillPreparer implements TwillPreparer {
   }
 
   /**
-   * Returns the minimum heap ratio based on the configuration.
-   */
-  private double getMinHeapRatio() {
-    // doing this way to support hadoop-2.0 profile
-    String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
-    return (minHeapRatioStr == null) ? 
Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
-  }
-
-  /**
-   * Returns the reserved memory size in MB based on the configuration.
-   */
-  private int getReservedMemory() {
-    return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, 
Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
-  }
-
-  /**
    * Returns the local staging directory based on the configuration.
    */
   private File getLocalStagingDir() {
@@ -670,7 +662,7 @@ final class YarnTwillPreparer implements TwillPreparer {
     return localFiles;
   }
 
-  private void saveSpecification(TwillSpecification spec, Path targetFile) 
throws IOException {
+  private TwillRuntimeSpecification saveSpecification(TwillSpecification spec, 
Path targetFile) throws IOException {
     final Multimap<String, LocalFile> runnableLocalFiles = 
populateRunnableLocalFiles(spec);
 
     // Rewrite LocalFiles inside twillSpec
@@ -692,15 +684,20 @@ final class YarnTwillPreparer implements TwillPreparer {
       }
       TwillSpecification newTwillSpec = new 
DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
                                                                       
spec.getPlacementPolicies(), eventHandler);
-      boolean logCollectionEnabled = 
config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED,
-                                                       
Configs.Defaults.LOG_COLLECTION_ENABLED);
-      TwillRuntimeSpecificationAdapter.create().toJson(
-        new TwillRuntimeSpecification(newTwillSpec, 
appLocation.getLocationFactory().getHomeLocation().getName(),
-                                      appLocation.toURI(), zkConnectString, 
runId, twillSpec.getName(),
-                                      getReservedMemory(), 
config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
-                                      logLevels, maxRetries, 
getMinHeapRatio(), logCollectionEnabled), writer);
+      Map<String, String> configMap = Maps.newHashMap();
+      for (Map.Entry<String, String> entry : config) {
+        configMap.put(entry.getKey(), entry.getValue());
+      }
+
+      TwillRuntimeSpecification twillRuntimeSpec = new 
TwillRuntimeSpecification(
+        newTwillSpec, 
appLocation.getLocationFactory().getHomeLocation().getName(),
+        appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
+        config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+        logLevels, maxRetries, configMap, runnableConfigs);
+      TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec, 
writer);
+      LOG.debug("Done {}", targetFile);
+      return twillRuntimeSpec;
     }
-    LOG.debug("Done {}", targetFile);
   }
 
   private void saveLogback(Path targetFile) throws IOException {

http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index f5143ce..73f1476 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -25,17 +25,19 @@ import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.internal.utils.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -67,9 +69,16 @@ public class ContainerSizeTestRun extends BaseYarnTest {
   @Test
   public void testMaxHeapSize() throws InterruptedException, TimeoutException, 
ExecutionException {
     TwillRunner runner = getTwillRunner();
+    String runnableName = "sleep";
+
     TwillController controller = runner.prepare(new MaxHeapApp())
-      // Alter the AM container size
-      
.withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, 
"256"))
+      // Alter the AM container size and heap ratio
+      .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256",
+                                         Configs.Keys.HEAP_RESERVED_MIN_RATIO, 
"0.65"))
+      // Use a different heap ratio and reserved memory size for the runnable
+      .withConfiguration(runnableName,
+                         ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, 
"0.8",
+                                         Configs.Keys.JAVA_RESERVED_MEMORY_MB, 
"1024"))
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
       .start();
 
@@ -77,10 +86,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
       ServiceDiscovered discovered = controller.discoverService("sleep");
       Assert.assertTrue(waitForSize(discovered, 1, 120));
 
-      // Verify the AM container size
+      // Verify the AM container size and heap size
       ResourceReport resourceReport = controller.getResourceReport();
       Assert.assertNotNull(resourceReport);
       Assert.assertEquals(256, 
resourceReport.getAppMasterResources().getMemoryMB());
+      Assert.assertEquals(Resources.computeMaxHeapSize(256, 
Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d),
+                          
resourceReport.getAppMasterResources().getMaxHeapMemoryMB());
+
+      // Verify the runnable container heap size
+      Collection<TwillRunResources> runnableResources = 
resourceReport.getRunnableResources(runnableName);
+      Assert.assertFalse(runnableResources.isEmpty());
+      TwillRunResources resources = runnableResources.iterator().next();
+      
Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 
0.8d),
+                          resources.getMaxHeapMemoryMB());
+
     } finally {
       controller.terminate().get(120, TimeUnit.SECONDS);
     }

Reply via email to