Repository: twill Updated Branches: refs/heads/master dbbc2a349 -> 4356c283e
(TWILL-241) Added support for per Runnable configuration Signed-off-by: Terence Yim <cht...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/2910b180 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/2910b180 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/2910b180 Branch: refs/heads/master Commit: 2910b1803aa1f43c6fd57fdafa51612baf360e5d Parents: dbbc2a3 Author: Terence Yim <cht...@apache.org> Authored: Fri Aug 4 15:29:05 2017 -0700 Committer: Terence Yim <cht...@apache.org> Committed: Mon Aug 7 23:29:40 2017 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/TwillPreparer.java | 11 +++ .../org/apache/twill/api/TwillRunResources.java | 5 ++ .../internal/DefaultTwillRunResources.java | 13 +++- .../twill/internal/TwillContainerLauncher.java | 19 ++++- .../internal/TwillRuntimeSpecification.java | 77 ++++++++++++++++---- .../internal/json/TwillRunResourcesCodec.java | 20 +++-- .../json/TwillRuntimeSpecificationCodec.java | 29 +++++--- .../appmaster/ApplicationMasterService.java | 17 +++-- .../internal/appmaster/RunningContainers.java | 6 +- .../apache/twill/yarn/YarnTwillPreparer.java | 57 +++++++-------- .../apache/twill/yarn/ContainerSizeTestRun.java | 27 ++++++- 11 files changed, 199 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index 1f50972..35930d2 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -39,6 +39,17 @@ public interface TwillPreparer { TwillPreparer withConfiguration(Map<String, String> config); /** + * Overrides the default configuration with the given set of configurations for the given runnable only. + * This is useful to override configurations that affects runnables, such as + * {@link Configs.Keys#JAVA_RESERVED_MEMORY_MB} and {@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}. + * + * @param runnableName Name of the {@link TwillRunnable}. + * @param config set of configurations to override + * @return This {@link TwillPreparer} + */ + TwillPreparer withConfiguration(String runnableName, Map<String, String> config); + + /** * Adds a {@link LogHandler} for receiving an application log. * @param handler The {@link LogHandler}. * @return This {@link TwillPreparer}. http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java index f721f47..287b901 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillRunResources.java @@ -44,6 +44,11 @@ public interface TwillRunResources { int getMemoryMB(); /** + * @return the maximum amount of memory in MB of Java process heap memory. + */ + int getMaxHeapMemoryMB(); + + /** * @return the host the runnable is running on. */ String getHost(); http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java index 83b973a..f05074e 100644 --- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java +++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java @@ -35,6 +35,7 @@ public class DefaultTwillRunResources implements TwillRunResources { private final int instanceId; private final int virtualCores; private final int memoryMB; + private final int maxHeapMemoryMB; private final String host; private final Integer debugPort; private final Map<String, LogEntry.Level> logLevels; @@ -42,17 +43,18 @@ public class DefaultTwillRunResources implements TwillRunResources { /** * Constructor to create an instance of {@link DefaultTwillRunResources} with empty log levels. */ - public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, + public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB, String host, Integer debugPort) { - this(instanceId, containerId, cores, memoryMB, host, debugPort, Collections.<String, Level>emptyMap()); + this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap()); } - public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, + public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB, String host, Integer debugPort, Map<String, LogEntry.Level> logLevels) { this.instanceId = instanceId; this.containerId = containerId; this.virtualCores = cores; this.memoryMB = memoryMB; + this.maxHeapMemoryMB = maxHeapMemoryMB; this.host = host; this.debugPort = debugPort; this.logLevels = new HashMap<>(logLevels); @@ -91,6 +93,11 @@ public class DefaultTwillRunResources implements TwillRunResources { return memoryMB; } + @Override + public int getMaxHeapMemoryMB() { + return maxHeapMemoryMB; + } + /** * @return the host the runnable is running on. */ http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 0f8674b..700c0f1 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -61,6 +61,7 @@ public final class TwillContainerLauncher { private final int reservedMemory; private final double minHeapRatio; private final Location secureStoreLocation; + private int maxHeapSizeMB; public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo, ProcessLauncher.PrepareLaunchContext launchContext, @@ -144,13 +145,12 @@ public final class TwillContainerLauncher { firstCommand = "$JAVA_HOME/bin/java"; } - int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, - minHeapRatio); + maxHeapSizeMB = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, minHeapRatio); commandBuilder.add("-Djava.io.tmpdir=tmp", "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID, "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath, - "-Xmx" + memory + "m"); + "-Xmx" + maxHeapSizeMB + "m"); if (jvmOpts.getExtraOptions() != null) { commandBuilder.add(jvmOpts.getExtraOptions()); } @@ -169,6 +169,19 @@ public final class TwillContainerLauncher { return controller; } + /** + * Returns the maximum heap memory size in MB of the Java process launched in the container. + * This method can only be called after the {@link #start(RunId, int, Class, String, Location)} method. + * + * @throws IllegalStateException if the {@link #start(RunId, int, Class, String, Location)} was not called yet. + */ + public int getMaxHeapMemoryMB() { + if (maxHeapSizeMB <= 0) { + throw new IllegalStateException("Unknown maximum heap memory size. Please make sure the container is started"); + } + return maxHeapSizeMB; + } + private static final class TwillContainerControllerImpl extends AbstractZKServiceController implements TwillContainerController { http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java index 831c831..636d94d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java @@ -17,6 +17,7 @@ */ package org.apache.twill.internal; +import org.apache.twill.api.Configs; import org.apache.twill.api.RunId; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillSpecification; @@ -37,30 +38,28 @@ public class TwillRuntimeSpecification { private final String zkConnectStr; private final RunId twillRunId; private final String twillAppName; - private final int reservedMemory; private final String rmSchedulerAddr; private final Map<String, Map<String, String>> logLevels; private final Map<String, Integer> maxRetries; - private final double minHeapRatio; - private final boolean logCollectionEnabled; + private final Map<String, String> config; + private final Map<String, Map<String, String>> runnableConfigs; public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir, String zkConnectStr, RunId twillRunId, String twillAppName, - int reservedMemory, @Nullable String rmSchedulerAddr, - Map<String, Map<String, String>> logLevels, Map<String, Integer> maxRetries, - double minHeapRatio, boolean logCollectionEnabled) { + @Nullable String rmSchedulerAddr, Map<String, Map<String, String>> logLevels, + Map<String, Integer> maxRetries, Map<String, String> config, + Map<String, Map<String, String>> runnableConfigs) { this.twillSpecification = twillSpecification; this.fsUser = fsUser; this.twillAppDir = twillAppDir; this.zkConnectStr = zkConnectStr; this.twillRunId = twillRunId; this.twillAppName = twillAppName; - this.reservedMemory = reservedMemory; this.rmSchedulerAddr = rmSchedulerAddr; this.logLevels = logLevels; this.maxRetries = maxRetries; - this.minHeapRatio = minHeapRatio; - this.logCollectionEnabled = logCollectionEnabled; + this.config = config; + this.runnableConfigs = runnableConfigs; } public TwillSpecification getTwillSpecification() { @@ -87,19 +86,46 @@ public class TwillRuntimeSpecification { return twillAppName; } - public int getReservedMemory() { - return reservedMemory; + /** + * Returns the minimum heap ratio for the application master. + */ + public double getAMMinHeapRatio() { + return getMinHeapRatio(config); + } + + /** + * Returns the minimum heap ratio for the given runnable. + */ + public double getMinHeapRatio(String runnableName) { + return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config); } - public double getMinHeapRatio() { - return minHeapRatio; + /** + * Returns the reserved non-heap memory size in MB for the application master. + */ + public int getAMReservedMemory() { + return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ? + Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) : + Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB; + } + + /** + * Returns the reserved non-heap memory size in MB for the given runnable. + */ + public int getReservedMemory(String runnableName) { + Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config; + return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ? + Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) : + Configs.Defaults.JAVA_RESERVED_MEMORY_MB; } /** * Returns whether log collection is enabled. */ public boolean isLogCollectionEnabled() { - return logCollectionEnabled; + return config.containsKey(Configs.Keys.LOG_COLLECTION_ENABLED) ? + Boolean.parseBoolean(config.get(Configs.Keys.LOG_COLLECTION_ENABLED)) : + Configs.Defaults.LOG_COLLECTION_ENABLED; } @Nullable @@ -116,6 +142,20 @@ public class TwillRuntimeSpecification { } /** + * Returns the configuration for the application. + */ + public Map<String, String> getConfig() { + return config; + } + + /** + * Returns the configurations for each runnable. + */ + public Map<String, Map<String, String>> getRunnableConfigs() { + return runnableConfigs; + } + + /** * Returns the ZK connection string for the Kafka used for log collections, * or {@code null} if log collection is disabled. */ @@ -127,4 +167,13 @@ public class TwillRuntimeSpecification { // When addressing TWILL-147, a field can be introduced to carry this value. return String.format("%s/%s/%s/kafka", getZkConnectStr(), getTwillAppName(), getTwillAppRunId()); } + + /** + * Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration. + */ + private double getMinHeapRatio(Map<String, String> config) { + return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ? + Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) : + Configs.Defaults.HEAP_RESERVED_MIN_RATIO; + } } http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java index bb4d435..c9196c4 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java @@ -41,6 +41,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso private static final String INSTANCE_ID = "instanceId"; private static final String HOST = "host"; private static final String MEMORY_MB = "memoryMB"; + private static final String MAX_HEAP_MEMORY_MB = "maxHeapMemoryMB"; private static final String VIRTUAL_CORES = "virtualCores"; private static final String DEBUG_PORT = "debugPort"; private static final String LOG_LEVELS = "logLevels"; @@ -53,6 +54,7 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso json.addProperty(INSTANCE_ID, src.getInstanceId()); json.addProperty(HOST, src.getHost()); json.addProperty(MEMORY_MB, src.getMemoryMB()); + json.addProperty(MAX_HEAP_MEMORY_MB, src.getMaxHeapMemoryMB()); json.addProperty(VIRTUAL_CORES, src.getVirtualCores()); if (src.getDebugPort() != null) { json.addProperty(DEBUG_PORT, src.getDebugPort()); @@ -69,12 +71,16 @@ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunReso JsonObject jsonObj = json.getAsJsonObject(); Map<String, LogEntry.Level> logLevels = context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, LogEntry.Level>>() { }.getType()); - return new DefaultTwillRunResources(jsonObj.get(INSTANCE_ID).getAsInt(), - jsonObj.get(CONTAINER_ID).getAsString(), - jsonObj.get(VIRTUAL_CORES).getAsInt(), - jsonObj.get(MEMORY_MB).getAsInt(), - jsonObj.get(HOST).getAsString(), - jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null, - logLevels); + int memoryMB = jsonObj.get(MEMORY_MB).getAsInt(); + return new DefaultTwillRunResources( + jsonObj.get(INSTANCE_ID).getAsInt(), + jsonObj.get(CONTAINER_ID).getAsString(), + jsonObj.get(VIRTUAL_CORES).getAsInt(), + memoryMB, + // For backward compatibility when a newer Twill client re-attached to running app started with older version. + jsonObj.has(MAX_HEAP_MEMORY_MB) ? jsonObj.get(MAX_HEAP_MEMORY_MB).getAsInt() : memoryMB, + jsonObj.get(HOST).getAsString(), + jsonObj.has(DEBUG_PORT) ? jsonObj.get(DEBUG_PORT).getAsInt() : null, + logLevels); } } http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java index 5ff05e8..710a9f7 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java @@ -39,18 +39,20 @@ import java.util.Map; final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntimeSpecification>, JsonDeserializer<TwillRuntimeSpecification> { + private static final Type MAP_STRING_MAP_STRING_STRING_TYPE = + new TypeToken<Map<String, Map<String, String>>>() { }.getType(); + private static final String FS_USER = "fsUser"; private static final String TWILL_APP_DIR = "twillAppDir"; private static final String ZK_CONNECT_STR = "zkConnectStr"; private static final String TWILL_RUNID = "twillRunId"; private static final String TWILL_APP_NAME = "twillAppName"; - private static final String RESERVED_MEMORY = "reservedMemory"; - private static final String HEAP_RESERVED_MIN_RATIO = "minHeapRatio"; private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr"; private static final String TWILL_SPEC = "twillSpecification"; private static final String LOG_LEVELS = "logLevels"; private static final String MAX_RETRIES = "maxRetries"; - private static final String LOG_COLLECTION_ENABLED = "logCollectionEnabled"; + private static final String CONFIG = "config"; + private static final String RUNNABLE_CONFIGS = "runnableConfigs"; @Override public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) { @@ -60,19 +62,19 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr()); json.addProperty(TWILL_RUNID, src.getTwillAppRunId().getId()); json.addProperty(TWILL_APP_NAME, src.getTwillAppName()); - json.addProperty(RESERVED_MEMORY, src.getReservedMemory()); - json.addProperty(HEAP_RESERVED_MIN_RATIO, src.getMinHeapRatio()); if (src.getRmSchedulerAddr() != null) { json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr()); } json.add(TWILL_SPEC, context.serialize(src.getTwillSpecification(), new TypeToken<TwillSpecification>() { }.getType())); json.add(LOG_LEVELS, - context.serialize(src.getLogLevels(), new TypeToken<Map<String, Map<String, String>>>() { }.getType())); + context.serialize(src.getLogLevels(), MAP_STRING_MAP_STRING_STRING_TYPE)); json.add(MAX_RETRIES, context.serialize(src.getMaxRetries(), new TypeToken<Map<String, Integer>>() { }.getType())); - json.addProperty(LOG_COLLECTION_ENABLED, src.isLogCollectionEnabled()); - + json.add(CONFIG, + context.serialize(src.getConfig(), new TypeToken<Map<String, String>>() { }.getType())); + json.add(RUNNABLE_CONFIGS, + context.serialize(src.getRunnableConfigs(), MAP_STRING_MAP_STRING_STRING_TYPE)); return json; } @@ -84,9 +86,13 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim TwillSpecification twillSpecification = context.deserialize( jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType()); Map<String, Map<String, String>> logLevels = - context.deserialize(jsonObj.get(LOG_LEVELS), new TypeToken<Map<String, Map<String, String>>>() { }.getType()); + context.deserialize(jsonObj.get(LOG_LEVELS), MAP_STRING_MAP_STRING_STRING_TYPE); Map<String, Integer> maxRetries = context.deserialize(jsonObj.get(MAX_RETRIES), new TypeToken<Map<String, Integer>>() { }.getType()); + Map<String, String> config = + context.deserialize(jsonObj.get(CONFIG), new TypeToken<Map<String, String>>() { }.getType()); + Map<String, Map<String, String>> runnableConfigs = + context.deserialize(jsonObj.get(RUNNABLE_CONFIGS), MAP_STRING_MAP_STRING_STRING_TYPE); return new TwillRuntimeSpecification(twillSpecification, jsonObj.get(FS_USER).getAsString(), @@ -94,12 +100,11 @@ final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntim jsonObj.get(ZK_CONNECT_STR).getAsString(), RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()), jsonObj.get(TWILL_APP_NAME).getAsString(), - jsonObj.get(RESERVED_MEMORY).getAsInt(), jsonObj.has(RM_SCHEDULER_ADDR) ? jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null, logLevels, maxRetries, - jsonObj.get(HEAP_RESERVED_MIN_RATIO).getAsDouble(), - jsonObj.get(LOG_COLLECTION_ENABLED).getAsBoolean()); + config, + runnableConfigs); } } http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 523ffce..a2ebf7b 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -71,6 +71,7 @@ import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.state.SystemMessages; import org.apache.twill.internal.utils.Instances; +import org.apache.twill.internal.utils.Resources; import org.apache.twill.internal.yarn.AbstractYarnTwillService; import org.apache.twill.internal.yarn.YarnAMClient; import org.apache.twill.internal.yarn.YarnContainerInfo; @@ -127,8 +128,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private final ExpectedContainers expectedContainers; private final YarnAMClient amClient; private final JvmOptions jvmOpts; - private final int reservedMemory; - private final double minHeapRatio; private final EventHandler eventHandler; private final Location applicationLocation; private final PlacementPolicyManager placementPolicyManager; @@ -151,8 +150,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp this.amClient = amClient; this.credentials = createCredentials(); this.jvmOpts = loadJvmOptions(); - this.reservedMemory = twillRuntimeSpec.getReservedMemory(); - this.minHeapRatio = twillRuntimeSpec.getMinHeapRatio(); this.twillSpec = twillRuntimeSpec.getTwillSpecification(); this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies()); this.environments = getEnvironments(); @@ -198,11 +195,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private RunningContainers createRunningContainers(ContainerId appMasterContainerId, String appMasterHost) throws Exception { + int containerMemoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)); + + // We can't get the -Xmx easily, so just recompute the -Xmx in the same way that the client does + int maxHeapMemoryMB = Resources.computeMaxHeapSize(containerMemoryMB, + twillRuntimeSpec.getAMReservedMemory(), + twillRuntimeSpec.getAMMinHeapRatio()); TwillRunResources appMasterResources = new DefaultTwillRunResources( 0, appMasterContainerId.toString(), Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)), - Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)), + containerMemoryMB, + maxHeapMemoryMB, appMasterHost, null); String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString(); return new RunningContainers(appId, appMasterResources, zkClient, applicationLocation, @@ -667,7 +671,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp TwillContainerLauncher launcher = new TwillContainerLauncher( twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext, ZKClients.namespace(zkClient, getZKNamespace(runnableName)), - containerCount, jvmOpts, reservedMemory, getSecureStoreLocation(), minHeapRatio); + containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(), + twillRuntimeSpec.getMinHeapRatio(runnableName)); runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher); http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java index a950c46..c85c372 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java @@ -163,9 +163,9 @@ final class RunningContainers { containerInfo.getId(), containerInfo.getVirtualCores(), containerInfo.getMemoryMB(), + launcher.getMaxHeapMemoryMB(), containerInfo.getHost().getHostName(), controller); - resourceReport.addRunResources(runnableName, resources); containerStats.put(runnableName, containerInfo); @@ -734,9 +734,9 @@ final class RunningContainers { private Integer dynamicDebugPort = null; private DynamicTwillRunResources(int instanceId, String containerId, - int cores, int memoryMB, String host, + int cores, int memoryMB, int maxHeapMemoryMB, String host, TwillContainerController controller) { - super(instanceId, containerId, cores, memoryMB, host, null); + super(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, null); this.controller = controller; } http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index 52e18eb..5442fa0 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -147,6 +147,7 @@ final class YarnTwillPreparer implements TwillPreparer { private final Map<String, Map<String, String>> logLevels = Maps.newHashMap(); private final LocationCache locationCache; private final Map<String, Integer> maxRetries = Maps.newHashMap(); + private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap(); private String schedulerQueue; private String extraOptions; private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG; @@ -183,6 +184,13 @@ final class YarnTwillPreparer implements TwillPreparer { } @Override + public TwillPreparer withConfiguration(String runnableName, Map<String, String> config) { + confirmRunnableName(runnableName); + runnableConfigs.put(runnableName, Maps.newHashMap(config)); + return this; + } + + @Override public TwillPreparer addLogHandler(LogHandler handler) { logHandlers.add(handler); return this; @@ -382,10 +390,11 @@ final class YarnTwillPreparer implements TwillPreparer { createApplicationJar(createBundler(classAcceptor), localFiles); createResourcesJar(createBundler(classAcceptor), localFiles); + TwillRuntimeSpecification twillRuntimeSpec; Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(), Constants.Files.RUNTIME_CONFIG_JAR); try { - saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC)); + twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC)); saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE)); saveClassPaths(runtimeConfigDir); saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS)); @@ -405,10 +414,9 @@ final class YarnTwillPreparer implements TwillPreparer { // appMaster.jar // org.apache.twill.internal.appmaster.ApplicationMasterMain // false - - int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB, - Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB); - int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio()); + int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), + twillRuntimeSpec.getAMReservedMemory(), + twillRuntimeSpec.getAMMinHeapRatio()); return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), createSubmissionCredentials()) .addCommand( @@ -439,22 +447,6 @@ final class YarnTwillPreparer implements TwillPreparer { } /** - * Returns the minimum heap ratio based on the configuration. - */ - private double getMinHeapRatio() { - // doing this way to support hadoop-2.0 profile - String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO); - return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr); - } - - /** - * Returns the reserved memory size in MB based on the configuration. - */ - private int getReservedMemory() { - return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB); - } - - /** * Returns the local staging directory based on the configuration. */ private File getLocalStagingDir() { @@ -670,7 +662,7 @@ final class YarnTwillPreparer implements TwillPreparer { return localFiles; } - private void saveSpecification(TwillSpecification spec, Path targetFile) throws IOException { + private TwillRuntimeSpecification saveSpecification(TwillSpecification spec, Path targetFile) throws IOException { final Multimap<String, LocalFile> runnableLocalFiles = populateRunnableLocalFiles(spec); // Rewrite LocalFiles inside twillSpec @@ -692,15 +684,20 @@ final class YarnTwillPreparer implements TwillPreparer { } TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), spec.getPlacementPolicies(), eventHandler); - boolean logCollectionEnabled = config.getBoolean(Configs.Keys.LOG_COLLECTION_ENABLED, - Configs.Defaults.LOG_COLLECTION_ENABLED); - TwillRuntimeSpecificationAdapter.create().toJson( - new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(), - appLocation.toURI(), zkConnectString, runId, twillSpec.getName(), - getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), - logLevels, maxRetries, getMinHeapRatio(), logCollectionEnabled), writer); + Map<String, String> configMap = Maps.newHashMap(); + for (Map.Entry<String, String> entry : config) { + configMap.put(entry.getKey(), entry.getValue()); + } + + TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification( + newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(), + appLocation.toURI(), zkConnectString, runId, twillSpec.getName(), + config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), + logLevels, maxRetries, configMap, runnableConfigs); + TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec, writer); + LOG.debug("Done {}", targetFile); + return twillRuntimeSpec; } - LOG.debug("Done {}", targetFile); } private void saveLogback(Path targetFile) throws IOException { http://git-wip-us.apache.org/repos/asf/twill/blob/2910b180/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java index f5143ce..73f1476 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java @@ -25,17 +25,19 @@ import org.apache.twill.api.ResourceReport; import org.apache.twill.api.ResourceSpecification; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillRunResources; import org.apache.twill.api.TwillRunner; import org.apache.twill.api.TwillSpecification; import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.discovery.ServiceDiscovered; +import org.apache.twill.internal.utils.Resources; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintWriter; -import java.util.Collections; +import java.util.Collection; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,9 +69,16 @@ public class ContainerSizeTestRun extends BaseYarnTest { @Test public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException { TwillRunner runner = getTwillRunner(); + String runnableName = "sleep"; + TwillController controller = runner.prepare(new MaxHeapApp()) - // Alter the AM container size - .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256")) + // Alter the AM container size and heap ratio + .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256", + Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65")) + // Use a different heap ratio and reserved memory size for the runnable + .withConfiguration(runnableName, + ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8", + Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024")) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) .start(); @@ -77,10 +86,20 @@ public class ContainerSizeTestRun extends BaseYarnTest { ServiceDiscovered discovered = controller.discoverService("sleep"); Assert.assertTrue(waitForSize(discovered, 1, 120)); - // Verify the AM container size + // Verify the AM container size and heap size ResourceReport resourceReport = controller.getResourceReport(); Assert.assertNotNull(resourceReport); Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB()); + Assert.assertEquals(Resources.computeMaxHeapSize(256, Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d), + resourceReport.getAppMasterResources().getMaxHeapMemoryMB()); + + // Verify the runnable container heap size + Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName); + Assert.assertFalse(runnableResources.isEmpty()); + TwillRunResources resources = runnableResources.iterator().next(); + Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d), + resources.getMaxHeapMemoryMB()); + } finally { controller.terminate().get(120, TimeUnit.SECONDS); }