(TWILL-241) Added support for per runnable JVM options - Also removed JvmOptionsCodec since JvmOptions only uses simple types
This closes #59 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/4356c283 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/4356c283 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/4356c283 Branch: refs/heads/master Commit: 4356c283e2d4bc78a173788b412701525d24d621 Parents: 2910b18 Author: Terence Yim <[email protected]> Authored: Fri Aug 4 16:19:32 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Mon Aug 7 23:30:04 2017 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/TwillPreparer.java | 13 +++ .../internal/DefaultTwillRunResources.java | 3 +- .../org/apache/twill/internal/JvmOptions.java | 17 ++- .../twill/internal/TwillContainerLauncher.java | 10 +- .../internal/TwillRuntimeSpecification.java | 59 +++++++--- .../twill/internal/json/JvmOptionsCodec.java | 111 ------------------- .../internal/json/JvmOptionsCodecTest.java | 107 ------------------ .../appmaster/ApplicationMasterService.java | 18 +-- .../apache/twill/yarn/YarnTwillPreparer.java | 77 ++++++++----- .../apache/twill/yarn/ContainerSizeTestRun.java | 16 ++- .../twill/yarn/CustomClassLoaderRunnable.java | 3 +- .../twill/yarn/CustomClassLoaderTestRun.java | 3 +- .../apache/twill/yarn/JvmOptionsTestRun.java | 103 +++++++++++++++++ .../org/apache/twill/yarn/YarnTestSuite.java | 1 + 14 files changed, 258 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index 35930d2..812a086 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -87,6 +87,19 @@ public interface TwillPreparer { TwillPreparer setJVMOptions(String options); /** + * This methods sets the extra JVM options that will be passed to the java command line for the given runnable + * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance. + * The options set for the given runnable will be appended to any global options set through the + * {@link #setJVMOptions(String)} or {@link #addJVMOptions(String)} method. + * + * This is intended for advance usage. All options will be passed unchanged to the java command line. Invalid + * options could cause application not able to start. + * + * @param options extra JVM options. + */ + TwillPreparer setJVMOptions(String runnableName, String options); + + /** * This methods adds extra JVM options that will be passed to the java command line for every runnable * of the application started through this {@link org.apache.twill.api.TwillPreparer} instance. * http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java index f05074e..6f8a052 100644 --- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java +++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillRunResources.java @@ -45,7 +45,8 @@ public class DefaultTwillRunResources implements TwillRunResources { */ public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB, String host, Integer debugPort) { - this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, Collections.<String, Level>emptyMap()); + this(instanceId, containerId, cores, memoryMB, maxHeapMemoryMB, host, debugPort, + Collections.<String, Level>emptyMap()); } public DefaultTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, int maxHeapMemoryMB, http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java index 945561b..6e35c6c 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java +++ b/twill-core/src/main/java/org/apache/twill/internal/JvmOptions.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.primitives.Booleans; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -30,17 +31,29 @@ import java.util.Set; public final class JvmOptions { private final String extraOptions; + private final Map<String, String> runnableExtraOptions; private final DebugOptions debugOptions; - public JvmOptions(String extraOptions, DebugOptions debugOptions) { + public JvmOptions(String extraOptions, Map<String, String> runnableExtraOptions, DebugOptions debugOptions) { this.extraOptions = extraOptions; + this.runnableExtraOptions = runnableExtraOptions; this.debugOptions = debugOptions; } - public String getExtraOptions() { + /** + * Returns the extra options for the application master. + */ + public String getAMExtraOptions() { return extraOptions; } + /** + * Returns the extra options for the given runnable. + */ + public String getRunnableExtraOptions(String runnableName) { + return runnableExtraOptions.containsKey(runnableName) ? runnableExtraOptions.get(runnableName) : extraOptions; + } + public DebugOptions getDebugOptions() { return debugOptions; } http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 700c0f1..0b98ba6 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -65,8 +65,9 @@ public final class TwillContainerLauncher { public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo, ProcessLauncher.PrepareLaunchContext launchContext, - ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory, - Location secureStoreLocation, double minHeapRatio) { + ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, + int reservedMemory, double minHeapRatio, + Location secureStoreLocation) { this.runtimeSpec = runtimeSpec; this.containerInfo = containerInfo; this.launchContext = launchContext; @@ -151,8 +152,9 @@ public final class TwillContainerLauncher { "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath, "-Xmx" + maxHeapSizeMB + "m"); - if (jvmOpts.getExtraOptions() != null) { - commandBuilder.add(jvmOpts.getExtraOptions()); + String extraOptions = jvmOpts.getRunnableExtraOptions(runtimeSpec.getName()); + if (!extraOptions.isEmpty()) { + commandBuilder.add(extraOptions); } commandBuilder.add(TwillLauncher.class.getName(), mainClass.getName(), http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java index 636d94d..6361bd6 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java @@ -90,33 +90,32 @@ public class TwillRuntimeSpecification { * Returns the minimum heap ratio for the application master. */ public double getAMMinHeapRatio() { - return getMinHeapRatio(config); + return getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO); } /** * Returns the minimum heap ratio for the given runnable. */ public double getMinHeapRatio(String runnableName) { - return getMinHeapRatio(runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config); + double ratio = getMinHeapRatio(runnableConfigs.get(runnableName), 0d); + return ratio <= 0d ? getMinHeapRatio(config, Configs.Defaults.HEAP_RESERVED_MIN_RATIO) : ratio; } /** * Returns the reserved non-heap memory size in MB for the application master. */ public int getAMReservedMemory() { - return config.containsKey(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB) ? - Integer.parseInt(config.get(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB)) : - Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB; + return getReservedMemory(config, Configs.Keys.YARN_AM_RESERVED_MEMORY_MB, + Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB); } /** * Returns the reserved non-heap memory size in MB for the given runnable. */ public int getReservedMemory(String runnableName) { - Map<String, String> conf = runnableConfigs.containsKey(runnableName) ? runnableConfigs.get(runnableName) : config; - return conf.containsKey(Configs.Keys.JAVA_RESERVED_MEMORY_MB) ? - Integer.parseInt(conf.get(Configs.Keys.JAVA_RESERVED_MEMORY_MB)) : - Configs.Defaults.JAVA_RESERVED_MEMORY_MB; + int memory = getReservedMemory(runnableConfigs.get(runnableName), Configs.Keys.JAVA_RESERVED_MEMORY_MB, -1); + return memory < 0 ? getReservedMemory(config, Configs.Keys.JAVA_RESERVED_MEMORY_MB, + Configs.Defaults.JAVA_RESERVED_MEMORY_MB) : memory; } /** @@ -171,9 +170,43 @@ public class TwillRuntimeSpecification { /** * Returns the minimum heap ratio ({@link Configs.Keys#HEAP_RESERVED_MIN_RATIO}) based on the given configuration. */ - private double getMinHeapRatio(Map<String, String> config) { - return config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO) ? - Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) : - Configs.Defaults.HEAP_RESERVED_MIN_RATIO; + private double getMinHeapRatio(@Nullable Map<String, String> config, double defaultValue) { + if (config == null || !config.containsKey(Configs.Keys.HEAP_RESERVED_MIN_RATIO)) { + return defaultValue; + } + + try { + double ratio = Double.parseDouble(config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO)); + if (ratio <= 0d) { + throw new IllegalArgumentException("Minimum heap ratio configured with key '" + + Configs.Keys.HEAP_RESERVED_MIN_RATIO + + "' must be > 0. It is configured to " + ratio); + } + return ratio; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse the minimum heap ratio from configuration with key '" + + Configs.Keys.HEAP_RESERVED_MIN_RATIO + "'", e); + } + } + + /** + * Returns the reserved memory size based on the given configuration. + */ + private int getReservedMemory(@Nullable Map<String, String> config, String key, int defaultValue) { + if (config == null || !config.containsKey(key)) { + return defaultValue; + } + + try { + int memory = Integer.parseInt(config.get(key)); + if (memory < 0) { + throw new IllegalArgumentException("Reserved memory size configured with key '" + key + + "' must be >= 0. It is configured to " + memory); + } + return memory; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse the reserved memory size from configuration with key '" + + key + "'", e); + } } } http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java deleted file mode 100644 index 807840f..0000000 --- a/twill-core/src/main/java/org/apache/twill/internal/json/JvmOptionsCodec.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.json; - -import com.google.common.collect.ImmutableSet; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; -import org.apache.twill.internal.JvmOptions; - -import java.io.IOException; -import java.io.Reader; -import java.io.Writer; -import java.lang.reflect.Type; -import java.util.Set; - -/** - * Gson codec for {@link JvmOptions}. - */ -public class JvmOptionsCodec implements JsonSerializer<JvmOptions>, JsonDeserializer<JvmOptions> { - - private static final Gson GSON = new GsonBuilder().registerTypeAdapter(JvmOptions.class, new JvmOptionsCodec()) - .registerTypeAdapter(JvmOptions.DebugOptions.class, - new DebugOptionsCodec()) - .create(); - - public static void encode(JvmOptions jvmOptions, OutputSupplier<? extends Writer> writerSupplier) throws IOException { - try (Writer writer = writerSupplier.getOutput()) { - GSON.toJson(jvmOptions, writer); - } - } - - public static JvmOptions decode(InputSupplier<? extends Reader> readerSupplier) throws IOException { - try (Reader reader = readerSupplier.getInput()) { - return GSON.fromJson(reader, JvmOptions.class); - } - } - - @Override - public JvmOptions deserialize(JsonElement json, Type type, JsonDeserializationContext context) - throws JsonParseException { - JsonObject jsonObj = json.getAsJsonObject(); - String extraOptions = context.deserialize(jsonObj.get("extraOptions"), String.class); - JvmOptions.DebugOptions debugOptions = context.deserialize(jsonObj.get("debugOptions"), - JvmOptions.DebugOptions.class); - return new JvmOptions(extraOptions, debugOptions); - } - - @Override - public JsonElement serialize(JvmOptions jvmOptions, Type type, JsonSerializationContext context) { - JsonObject json = new JsonObject(); - json.add("extraOptions", context.serialize(jvmOptions.getExtraOptions())); - json.add("debugOptions", context.serialize(jvmOptions.getDebugOptions())); - return json; - } - - private static class DebugOptionsCodec - implements JsonSerializer<JvmOptions.DebugOptions>, JsonDeserializer<JvmOptions.DebugOptions> { - - @Override - public JvmOptions.DebugOptions deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) - throws JsonParseException { - JsonObject jsonObj = json.getAsJsonObject(); - Boolean doDebug = context.deserialize(jsonObj.get("doDebug"), Boolean.class); - if (!doDebug) { - return JvmOptions.DebugOptions.NO_DEBUG; - } - Boolean doSuspend = context.deserialize(jsonObj.get("doSuspend"), Boolean.class); - Set<String> runnables = context.deserialize(jsonObj.get("runnables"), - new TypeToken<Set<String>>() { }.getType()); - return new JvmOptions.DebugOptions(true, doSuspend, runnables == null ? null : ImmutableSet.copyOf(runnables)); - } - - @Override - public JsonElement serialize(JvmOptions.DebugOptions src, Type typeOfSrc, JsonSerializationContext context) { - JsonObject json = new JsonObject(); - json.add("doDebug", context.serialize(src.doDebug())); - json.add("doSuspend", context.serialize(src.doSuspend())); - if (src.getRunnables() != null) { - json.add("runnables", context.serialize(src.getRunnables())); - } - return json; - } - } -} - - http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java ---------------------------------------------------------------------- diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java deleted file mode 100644 index 2791e72..0000000 --- a/twill-core/src/test/java/org/apache/twill/internal/json/JvmOptionsCodecTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.json; - -import com.google.common.collect.ImmutableSet; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; -import org.apache.twill.internal.JvmOptions; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.io.StringWriter; -import java.io.Writer; -import java.util.Collections; -import java.util.List; - -/** - * Tests the JvmOptions Codec. - */ -public class JvmOptionsCodecTest { - - @Test - public void testNoNulls() throws Exception { - JvmOptions options = new JvmOptions("-version", - new JvmOptions.DebugOptions(true, false, ImmutableSet.of("one", "two"))); - final StringWriter writer = new StringWriter(); - JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() { - @Override - public Writer getOutput() throws IOException { - return writer; - } - }); - JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() { - @Override - public Reader getInput() throws IOException { - return new StringReader(writer.toString()); - } - }); - Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions()); - Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug()); - Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend()); - Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables()); - } - - @Test - public void testSomeNulls() throws Exception { - JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(false, false, null)); - final StringWriter writer = new StringWriter(); - JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() { - @Override - public Writer getOutput() throws IOException { - return writer; - } - }); - JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() { - @Override - public Reader getInput() throws IOException { - return new StringReader(writer.toString()); - } - }); - Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions()); - Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug()); - Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend()); - Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables()); - } - - @Test - public void testNoRunnables() throws Exception { - List<String> noRunnables = Collections.emptyList(); - JvmOptions options = new JvmOptions(null, new JvmOptions.DebugOptions(true, false, noRunnables)); - final StringWriter writer = new StringWriter(); - JvmOptionsCodec.encode(options, new OutputSupplier<Writer>() { - @Override - public Writer getOutput() throws IOException { - return writer; - } - }); - JvmOptions options1 = JvmOptionsCodec.decode(new InputSupplier<Reader>() { - @Override - public Reader getInput() throws IOException { - return new StringReader(writer.toString()); - } - }); - Assert.assertEquals(options.getExtraOptions(), options1.getExtraOptions()); - Assert.assertEquals(options.getDebugOptions().doDebug(), options1.getDebugOptions().doDebug()); - Assert.assertEquals(options.getDebugOptions().doSuspend(), options1.getDebugOptions().doSuspend()); - Assert.assertEquals(options.getDebugOptions().getRunnables(), options1.getDebugOptions().getRunnables()); - } -} http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index a2ebf7b..4917f4d 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -30,7 +30,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Ranges; import com.google.common.collect.Sets; -import com.google.common.io.InputSupplier; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -65,7 +64,6 @@ import org.apache.twill.internal.JvmOptions; import org.apache.twill.internal.ProcessLauncher; import org.apache.twill.internal.TwillContainerLauncher; import org.apache.twill.internal.TwillRuntimeSpecification; -import org.apache.twill.internal.json.JvmOptionsCodec; import org.apache.twill.internal.json.LocalFileCodec; import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.state.Message; @@ -84,7 +82,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.nio.charset.StandardCharsets; @@ -167,14 +164,11 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private JvmOptions loadJvmOptions() throws IOException { final File jvmOptsFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.JVM_OPTIONS); if (!jvmOptsFile.exists()) { - return new JvmOptions(null, JvmOptions.DebugOptions.NO_DEBUG); + return new JvmOptions("", Collections.<String, String>emptyMap(), JvmOptions.DebugOptions.NO_DEBUG); + } + try (Reader reader = Files.newBufferedReader(jvmOptsFile.toPath(), StandardCharsets.UTF_8)) { + return GSON.fromJson(reader, JvmOptions.class); } - return JvmOptionsCodec.decode(new InputSupplier<Reader>() { - @Override - public Reader getInput() throws IOException { - return new FileReader(jvmOptsFile); - } - }); } @SuppressWarnings("unchecked") @@ -671,8 +665,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp TwillContainerLauncher launcher = new TwillContainerLauncher( twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext, ZKClients.namespace(zkClient, getZKNamespace(runnableName)), - containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), getSecureStoreLocation(), - twillRuntimeSpec.getMinHeapRatio(runnableName)); + containerCount, jvmOpts, twillRuntimeSpec.getReservedMemory(runnableName), + twillRuntimeSpec.getMinHeapRatio(runnableName), getSecureStoreLocation()); runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher); http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index 5442fa0..0eba62b 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -75,7 +75,6 @@ import org.apache.twill.internal.appmaster.ApplicationMasterMain; import org.apache.twill.internal.container.TwillContainerMain; import org.apache.twill.internal.io.LocationCache; import org.apache.twill.internal.json.ArgumentsCodec; -import org.apache.twill.internal.json.JvmOptionsCodec; import org.apache.twill.internal.json.LocalFileCodec; import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.utils.Dependencies; @@ -114,6 +113,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; +import javax.annotation.Nullable; /** * Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN. @@ -148,14 +148,15 @@ final class YarnTwillPreparer implements TwillPreparer { private final LocationCache locationCache; private final Map<String, Integer> maxRetries = Maps.newHashMap(); private final Map<String, Map<String, String>> runnableConfigs = Maps.newHashMap(); - private String schedulerQueue; + private final Map<String, String> runnableExtraOptions = Maps.newHashMap(); private String extraOptions; private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG; + private String schedulerQueue; private ClassAcceptor classAcceptor; private String classLoaderClassName; YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId, - String zkConnectString, Location appLocation, String extraOptions, + String zkConnectString, Location appLocation, @Nullable String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) { this.config = config; this.twillSpec = twillSpec; @@ -164,7 +165,7 @@ final class YarnTwillPreparer implements TwillPreparer { this.appLocation = appLocation; this.controllerFactory = controllerFactory; this.credentials = createCredentials(); - this.extraOptions = extraOptions; + this.extraOptions = extraOptions == null ? "" : extraOptions; this.classAcceptor = new ClassAcceptor(); this.locationCache = locationCache; } @@ -209,13 +210,23 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillPreparer setJVMOptions(String options) { + Preconditions.checkArgument(options != null, "JVM options cannot be null."); this.extraOptions = options; return this; } @Override + public TwillPreparer setJVMOptions(String runnableName, String options) { + confirmRunnableName(runnableName); + Preconditions.checkArgument(options != null, "JVM options cannot be null."); + runnableExtraOptions.put(runnableName, options); + return this; + } + + @Override public TwillPreparer addJVMOptions(String options) { - this.extraOptions = extraOptions == null ? options : extraOptions + " " + options; + Preconditions.checkArgument(options != null, "JVM options cannot be null."); + this.extraOptions = extraOptions.isEmpty() ? options : extraOptions + " " + options; return this; } @@ -226,6 +237,9 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillPreparer enableDebugging(boolean doSuspend, String... runnables) { + for (String runnableName : runnables) { + confirmRunnableName(runnableName); + } this.debugOptions = new JvmOptions.DebugOptions(true, doSuspend, ImmutableSet.copyOf(runnables)); return this; } @@ -379,9 +393,6 @@ final class YarnTwillPreparer implements TwillPreparer { new Callable<ProcessController<YarnApplicationReport>>() { @Override public ProcessController<YarnApplicationReport> call() throws Exception { - - String extraOptions = getExtraOptions(); - // Local files needed by AM Map<String, LocalFile> localFiles = Maps.newHashMap(); @@ -391,13 +402,14 @@ final class YarnTwillPreparer implements TwillPreparer { createResourcesJar(createBundler(classAcceptor), localFiles); TwillRuntimeSpecification twillRuntimeSpec; + JvmOptions jvmOptions; Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(), Constants.Files.RUNTIME_CONFIG_JAR); try { twillRuntimeSpec = saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC)); saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE)); saveClassPaths(runtimeConfigDir); - saveJvmOptions(extraOptions, debugOptions, runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS)); + jvmOptions = saveJvmOptions(runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS)); saveArguments(new Arguments(arguments, runnableArgs), runtimeConfigDir.resolve(Constants.Files.ARGUMENTS)); saveEnvironments(runtimeConfigDir.resolve(Constants.Files.ENVIRONMENTS)); @@ -426,7 +438,7 @@ final class YarnTwillPreparer implements TwillPreparer { "-Dtwill.app=$" + Constants.TWILL_APP_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR", "-Xmx" + memory + "m", - extraOptions, + jvmOptions.getAMExtraOptions(), TwillLauncher.class.getName(), ApplicationMasterMain.class.getName(), Boolean.FALSE.toString()) @@ -456,12 +468,12 @@ final class YarnTwillPreparer implements TwillPreparer { /** * Returns the extra options for the container JVM. */ - private String getExtraOptions() { - String extraOptions = this.extraOptions == null ? "" : this.extraOptions; - if (classLoaderClassName != null) { - extraOptions += " -D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName; + private String addClassLoaderClassName(String extraOptions) { + if (classLoaderClassName == null) { + return extraOptions; } - return extraOptions; + String classLoaderProperty = "-D" + Constants.TWILL_CONTAINER_CLASSLOADER + "=" + classLoaderClassName; + return extraOptions.isEmpty() ? classLoaderProperty : " " + classLoaderProperty; } private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) { @@ -686,7 +698,9 @@ final class YarnTwillPreparer implements TwillPreparer { spec.getPlacementPolicies(), eventHandler); Map<String, String> configMap = Maps.newHashMap(); for (Map.Entry<String, String> entry : config) { - configMap.put(entry.getKey(), entry.getValue()); + if (entry.getKey().startsWith("twill.")) { + configMap.put(entry.getKey(), entry.getValue()); + } } TwillRuntimeSpecification twillRuntimeSpec = new TwillRuntimeSpecification( @@ -759,20 +773,31 @@ final class YarnTwillPreparer implements TwillPreparer { Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8)); } - private void saveJvmOptions(String extraOptions, - JvmOptions.DebugOptions debugOptions, final Path targetPath) throws IOException { - if (extraOptions.isEmpty() && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) { + private JvmOptions saveJvmOptions(final Path targetPath) throws IOException { + // Updates the extra options with the classloader name if necessary + final String globalOptions = addClassLoaderClassName(extraOptions); + // Append runnable specific extra options. + Map<String, String> runnableExtraOptions = Maps.newHashMap( + Maps.transformValues(this.runnableExtraOptions, new Function<String, String>() { + @Override + public String apply(String extraOptions) { + return globalOptions.isEmpty() ? extraOptions : globalOptions + " " + extraOptions; + } + })); + + JvmOptions jvmOptions = new JvmOptions(globalOptions, runnableExtraOptions, debugOptions); + if (globalOptions.isEmpty() && runnableExtraOptions.isEmpty() + && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) { // If no vm options, no need to localize the file. - return; + return jvmOptions; } + LOG.debug("Creating {}", targetPath); - JvmOptionsCodec.encode(new JvmOptions(extraOptions, debugOptions), new OutputSupplier<Writer>() { - @Override - public Writer getOutput() throws IOException { - return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8); - } - }); + try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) { + new Gson().toJson(new JvmOptions(globalOptions, runnableExtraOptions, debugOptions), writer); + } LOG.debug("Done {}", targetPath); + return jvmOptions; } private void saveArguments(Arguments arguments, final Path targetPath) throws IOException { http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java index 73f1476..ff06eee 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java @@ -69,14 +69,13 @@ public class ContainerSizeTestRun extends BaseYarnTest { @Test public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException { TwillRunner runner = getTwillRunner(); - String runnableName = "sleep"; TwillController controller = runner.prepare(new MaxHeapApp()) // Alter the AM container size and heap ratio .withConfiguration(ImmutableMap.of(Configs.Keys.YARN_AM_MEMORY_MB, "256", Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.65")) // Use a different heap ratio and reserved memory size for the runnable - .withConfiguration(runnableName, + .withConfiguration("sleep", ImmutableMap.of(Configs.Keys.HEAP_RESERVED_MIN_RATIO, "0.8", Configs.Keys.JAVA_RESERVED_MEMORY_MB, "1024")) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) @@ -84,7 +83,7 @@ public class ContainerSizeTestRun extends BaseYarnTest { try { ServiceDiscovered discovered = controller.discoverService("sleep"); - Assert.assertTrue(waitForSize(discovered, 1, 120)); + Assert.assertTrue(waitForSize(discovered, 2, 120)); // Verify the AM container size and heap size ResourceReport resourceReport = controller.getResourceReport(); @@ -94,12 +93,20 @@ public class ContainerSizeTestRun extends BaseYarnTest { resourceReport.getAppMasterResources().getMaxHeapMemoryMB()); // Verify the runnable container heap size - Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(runnableName); + Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources("sleep"); Assert.assertFalse(runnableResources.isEmpty()); TwillRunResources resources = runnableResources.iterator().next(); Assert.assertEquals(Resources.computeMaxHeapSize(resources.getMemoryMB(), 1024, 0.8d), resources.getMaxHeapMemoryMB()); + // For the sleep2 runnable, we don't set any ratio and reserved memory. + // The ratio should get default to 0.65 (app) and reserved memory to 200 + runnableResources = resourceReport.getRunnableResources("sleep2"); + Assert.assertFalse(runnableResources.isEmpty()); + resources = runnableResources.iterator().next(); + Assert.assertEquals( + Resources.computeMaxHeapSize(resources.getMemoryMB(), Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB, 0.65d), + resources.getMaxHeapMemoryMB()); } finally { controller.terminate().get(120, TimeUnit.SECONDS); } @@ -181,6 +188,7 @@ public class ContainerSizeTestRun extends BaseYarnTest { .setName("MaxHeapApp") .withRunnable() .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles() + .add("sleep2", new MaxHeapRunnable(23456), res).noLocalFiles() .anyOrder() .build(); } http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java index 66bcd42..591f931 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderRunnable.java @@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch; */ public final class CustomClassLoaderRunnable extends AbstractTwillRunnable { - static final String SERVICE_NAME = "custom.service"; static final String GENERATED_CLASS_NAME = "org.apache.twill.test.Generated"; private static final Logger LOG = LoggerFactory.getLogger(CustomClassLoaderRunnable.class); @@ -42,7 +41,7 @@ public final class CustomClassLoaderRunnable extends AbstractTwillRunnable { try { Class<?> cls = Class.forName(GENERATED_CLASS_NAME); java.lang.reflect.Method announce = cls.getMethod("announce", ServiceAnnouncer.class, String.class, int.class); - announce.invoke(cls.newInstance(), getContext(), SERVICE_NAME, 54321); + announce.invoke(cls.newInstance(), getContext(), System.getProperty("service.name"), 54321); Uninterruptibles.awaitUninterruptibly(stopLatch); } catch (Exception e) { LOG.error("Failed to call announce on " + GENERATED_CLASS_NAME, e); http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java index 0ac43a6..f0a75b2 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/CustomClassLoaderTestRun.java @@ -34,9 +34,10 @@ public class CustomClassLoaderTestRun extends BaseYarnTest { TwillController controller = getTwillRunner().prepare(new CustomClassLoaderRunnable()) .setClassLoader(CustomClassLoader.class.getName()) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .setJVMOptions(CustomClassLoaderRunnable.class.getSimpleName(), "-Dservice.name=custom") .start(); - Assert.assertTrue(waitForSize(controller.discoverService(CustomClassLoaderRunnable.SERVICE_NAME), 1, 120)); + Assert.assertTrue(waitForSize(controller.discoverService("custom"), 1, 120)); controller.terminate().get(); } } http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java new file mode 100644 index 0000000..3ec49e2 --- /dev/null +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/JvmOptionsTestRun.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.yarn; + +import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.PrinterLogHandler; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +/** + * Unit test for testing extra JVM options setting for runnables. + */ +public class JvmOptionsTestRun extends BaseYarnTest { + + @Test + public void testExtraOptions() throws InterruptedException, ExecutionException { + // Start the testing app with jvm options at both global level as well as for the specific runnables. + TwillController controller = getTwillRunner() + .prepare(new JvmOptionsApplication()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out))) + .setJVMOptions("-Dservice.name=default") + .setJVMOptions("r2", "-Dservice.name=r2") + .start(); + + // For r1 and r3 will be using "default" as the service name. + waitForSize(controller.discoverService("default"), 2, 120); + // r2 will be use "r2" as the service name. + waitForSize(controller.discoverService("r2"), 1, 120); + + controller.terminate().get(); + } + + /** + * Application for testing extra jvm options + */ + public static final class JvmOptionsApplication implements TwillApplication { + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName(JvmOptionsApplication.class.getSimpleName()) + .withRunnable() + .add("r1", new SimpleRunnable()).noLocalFiles() + .add("r2", new SimpleRunnable()).noLocalFiles() + .add("r3", new SimpleRunnable()).noLocalFiles() + .anyOrder() + .build(); + } + } + + /** + * A runnable that simple announce itself to some name based on the system property and wait for stop signal. + */ + public static final class SimpleRunnable extends AbstractTwillRunnable { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleRunnable.class); + + private final CountDownLatch stopLatch = new CountDownLatch(1); + + @Override + public void run() { + String runnableName = getContext().getSpecification().getName(); + String serviceName = System.getProperty("service.name"); + LOG.info("Announcing with name {} for runnable {}", serviceName, runnableName); + + // Compute a unique port name based on runnable name (running names are r[0-9]+) + getContext().announce(serviceName, 12345 + Integer.parseInt(runnableName.substring(1))); + try { + stopLatch.await(); + } catch (InterruptedException e) { + LOG.warn("Run thread interrupted", e); + } + } + + @Override + public void stop() { + stopLatch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/4356c283/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java index 0911a3d..0bb7fce 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java @@ -33,6 +33,7 @@ import org.junit.runners.Suite; EnvironmentTestRun.class, FailureRestartTestRun.class, InitializeFailTestRun.class, + JvmOptionsTestRun.class, LocalFileTestRun.class, LogHandlerTestRun.class, LogLevelChangeTestRun.class,
