(TWILL-225) Allow configurations overridable per TwillPreprer - Also increased the vmen-pmen ration in TwillTester to avoid test failure due to reduced container size - There is no way to disable vmen-pmen ratio check in Hadoop 2.0.
This closes #39 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/e154bfed Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/e154bfed Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/e154bfed Branch: refs/heads/site Commit: e154bfedb93ec8055def1bf55a4d2d8bc8ccf833 Parents: c4cceef Author: Terence Yim <[email protected]> Authored: Sat Mar 18 18:31:57 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Tue Mar 21 13:32:12 2017 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/TwillPreparer.java | 8 ++ .../apache/twill/yarn/YarnTwillPreparer.java | 103 +++++++++++++------ .../twill/yarn/YarnTwillRunnerService.java | 9 +- .../apache/twill/yarn/ContainerSizeTestRun.java | 10 ++ .../java/org/apache/twill/yarn/TwillTester.java | 2 +- 5 files changed, 96 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/e154bfed/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index fd568b3..43b751b 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -30,6 +30,14 @@ import java.util.concurrent.TimeUnit; public interface TwillPreparer { /** + * Overrides the default configuration with the given set of configurations. + * + * @param config set of configurations to override + * @return This {@link TwillPreparer} + */ + TwillPreparer withConfiguration(Map<String, String> config); + + /** * Adds a {@link LogHandler} for receiving an application log. * @param handler The {@link LogHandler}. * @return This {@link TwillPreparer}. http://git-wip-us.apache.org/repos/asf/twill/blob/e154bfed/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index de03a7a..2d1edd0 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -40,6 +40,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import joptsimple.OptionSpec; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -80,6 +81,7 @@ import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.utils.Dependencies; import org.apache.twill.internal.utils.Paths; import org.apache.twill.internal.utils.Resources; +import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory; import org.apache.twill.internal.yarn.YarnAppClient; import org.apache.twill.internal.yarn.YarnApplicationReport; import org.apache.twill.internal.yarn.YarnUtils; @@ -126,9 +128,8 @@ final class YarnTwillPreparer implements TwillPreparer { } }; - private final YarnConfiguration yarnConfig; + private final Configuration config; private final TwillSpecification twillSpec; - private final YarnAppClient yarnAppClient; private final String zkConnectString; private final Location appLocation; private final YarnTwillControllerFactory controllerFactory; @@ -143,9 +144,6 @@ final class YarnTwillPreparer implements TwillPreparer { private final Map<String, Map<String, String>> environments = Maps.newHashMap(); private final List<String> applicationClassPaths = Lists.newArrayList(); private final Credentials credentials; - private final int reservedMemory; - private final double minHeapRatio; - private final File localStagingDir; private final Map<String, Map<String, String>> logLevels = Maps.newHashMap(); private final LocationCache locationCache; private final Set<URL> twillClassPaths; @@ -155,25 +153,16 @@ final class YarnTwillPreparer implements TwillPreparer { private ClassAcceptor classAcceptor; private final Map<String, Integer> maxRetries = Maps.newHashMap(); - YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, RunId runId, - YarnAppClient yarnAppClient, String zkConnectString, Location appLocation, Set<URL> twillClassPaths, + YarnTwillPreparer(Configuration config, TwillSpecification twillSpec, RunId runId, + String zkConnectString, Location appLocation, Set<URL> twillClassPaths, String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) { - this.yarnConfig = yarnConfig; + this.config = config; this.twillSpec = twillSpec; this.runId = runId; - this.yarnAppClient = yarnAppClient; this.zkConnectString = zkConnectString; this.appLocation = appLocation; this.controllerFactory = controllerFactory; this.credentials = createCredentials(); - this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, - Configs.Defaults.JAVA_RESERVED_MEMORY_MB); - // doing this way to support hadoop-2.0 profile - String minHeapRatioStr = yarnConfig.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO); - this.minHeapRatio = (minHeapRatioStr == null) ? - Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr); - this.localStagingDir = new File(yarnConfig.get(Configs.Keys.LOCAL_STAGING_DIRECTORY, - Configs.Defaults.LOCAL_STAGING_DIRECTORY)); this.extraOptions = extraOptions; this.classAcceptor = new ClassAcceptor(); this.locationCache = locationCache; @@ -187,6 +176,14 @@ final class YarnTwillPreparer implements TwillPreparer { } @Override + public TwillPreparer withConfiguration(Map<String, String> config) { + for (Map.Entry<String, String> entry : config.entrySet()) { + this.config.set(entry.getKey(), entry.getValue()); + } + return this; + } + + @Override public TwillPreparer addLogHandler(LogHandler handler) { logHandlers.add(handler); return this; @@ -362,6 +359,7 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillController start(long timeout, TimeUnit timeoutUnit) { try { + final YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(config); final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo(); Callable<ProcessController<YarnApplicationReport>> submitTask = @@ -373,11 +371,11 @@ final class YarnTwillPreparer implements TwillPreparer { Map<String, LocalFile> localFiles = Maps.newHashMap(); createLauncherJar(localFiles); - createTwillJar(createBundler(classAcceptor), localFiles); + createTwillJar(createBundler(classAcceptor), yarnAppClient, localFiles); createApplicationJar(createApplicationJarBundler(classAcceptor), localFiles); createResourcesJar(createBundler(classAcceptor), localFiles); - Path runtimeConfigDir = Files.createTempDirectory(localStagingDir.toPath(), + Path runtimeConfigDir = Files.createTempDirectory(getLocalStagingDir().toPath(), Constants.Files.RUNTIME_CONFIG_JAR); try { saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC)); @@ -401,12 +399,11 @@ final class YarnTwillPreparer implements TwillPreparer { // org.apache.twill.internal.appmaster.ApplicationMasterMain // false - int reservedMemoryMB = yarnConfig.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB, - Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB); - int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), - reservedMemoryMB, - minHeapRatio); - return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials) + int reservedMemoryMB = config.getInt(Configs.Keys.YARN_AM_RESERVED_MEMORY_MB, + Configs.Defaults.YARN_AM_RESERVED_MEMORY_MB); + int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), reservedMemoryMB, getMinHeapRatio()); + return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), + createSubmissionCredentials()) .addCommand( "$JAVA_HOME/bin/java", "-Djava.io.tmpdir=tmp", @@ -431,6 +428,29 @@ final class YarnTwillPreparer implements TwillPreparer { } } + /** + * Returns the minimum heap ratio based on the configuration. + */ + private double getMinHeapRatio() { + // doing this way to support hadoop-2.0 profile + String minHeapRatioStr = config.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO); + return (minHeapRatioStr == null) ? Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr); + } + + /** + * Returns the reserved memory size in MB based on the configuration. + */ + private int getReservedMemory() { + return config.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB, Configs.Defaults.JAVA_RESERVED_MEMORY_MB); + } + + /** + * Returns the local staging directory based on the configuration. + */ + private File getLocalStagingDir() { + return new File(config.get(Configs.Keys.LOCAL_STAGING_DIRECTORY, Configs.Defaults.LOCAL_STAGING_DIRECTORY)); + } + private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) { Map<String, String> environment = environments.get(runnableName); if (environment == null) { @@ -455,21 +475,40 @@ final class YarnTwillPreparer implements TwillPreparer { this.logLevels.put(runnableName, newLevels); } + /** + * Creates an {@link Credentials} by copying the {@link Credentials} of the current user. + */ private Credentials createCredentials() { Credentials credentials = new Credentials(); try { credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials()); + } catch (IOException e) { + LOG.warn("Failed to get current user UGI. Current user credentials not added.", e); + } + return credentials; + } - List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, appLocation.getLocationFactory(), credentials); + /** + * Creates a {@link Credentials} for the application submission. + */ + private Credentials createSubmissionCredentials() { + Credentials credentials = new Credentials(); + try { + // Acquires delegation token for the location + List<Token<?>> tokens = YarnUtils.addDelegationTokens(config, appLocation.getLocationFactory(), credentials); if (LOG.isDebugEnabled()) { for (Token<?> token : tokens) { LOG.debug("Delegation token acquired for {}, {}", appLocation, token); } } } catch (IOException e) { - LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e); + LOG.warn("Failed to acquire delegation token for location {}", appLocation); } + + // Copy the user provided credentials. + // It will override the location delegation tokens acquired above if user supplies it. + credentials.addAll(this.credentials); return credentials; } @@ -481,7 +520,9 @@ final class YarnTwillPreparer implements TwillPreparer { return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null); } - private void createTwillJar(final ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException { + private void createTwillJar(final ApplicationBundler bundler, + final YarnAppClient yarnAppClient, + Map<String, LocalFile> localFiles) throws IOException { LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR); Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() { @Override @@ -633,8 +674,8 @@ final class YarnTwillPreparer implements TwillPreparer { TwillRuntimeSpecificationAdapter.create().toJson( new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(), appLocation.toURI(), zkConnectString, runId, twillSpec.getName(), - reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), - logLevels, maxRetries, minHeapRatio), writer); + getReservedMemory(), config.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), + logLevels, maxRetries, getMinHeapRatio()), writer); } LOG.debug("Done {}", targetFile); } @@ -787,7 +828,7 @@ final class YarnTwillPreparer implements TwillPreparer { } private ApplicationBundler createBundler(ClassAcceptor classAcceptor) { - return new ApplicationBundler(classAcceptor).setTempDir(localStagingDir); + return new ApplicationBundler(classAcceptor).setTempDir(getLocalStagingDir()); } /** http://git-wip-us.apache.org/repos/asf/twill/blob/e154bfed/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 405eb24..d8e48de 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -126,7 +126,6 @@ public final class YarnTwillRunnerService implements TwillRunnerService { }; private final YarnConfiguration yarnConfig; - private final YarnAppClient yarnAppClient; private final ZKClientService zkClientService; private final LocationFactory locationFactory; private final Table<String, RunId, YarnTwillController> controllers; @@ -162,7 +161,6 @@ public final class YarnTwillRunnerService implements TwillRunnerService { */ public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) { this.yarnConfig = config; - this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config); this.locationFactory = locationFactory; this.zkClientService = getZKClientService(zkConnect); this.controllers = HashBasedTable.create(); @@ -288,8 +286,9 @@ public final class YarnTwillRunnerService implements TwillRunnerService { locationCache = new NoCachingLocationCache(appLocation); } - return new YarnTwillPreparer(yarnConfig, twillSpec, runId, yarnAppClient, - zkClientService.getConnectString(), appLocation, twillClassPaths, jvmOptions, + Configuration config = new Configuration(yarnConfig); + return new YarnTwillPreparer(config, twillSpec, runId, zkClientService.getConnectString(), + appLocation, twillClassPaths, jvmOptions, locationCache, new YarnTwillControllerFactory() { @Override public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers, @@ -596,6 +595,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService { synchronized (YarnTwillRunnerService.this) { if (!controllers.contains(appName, runId)) { ZKClient zkClient = ZKClients.namespace(zkClientService, "/" + appName); + YarnAppClient yarnAppClient = new VersionDetectYarnAppClientFactory().create(new Configuration(yarnConfig)); + YarnTwillController controller = listenController( new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient)); controllers.put(appName, runId, controller); http://git-wip-us.apache.org/repos/asf/twill/blob/e154bfed/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java index c6f7b9a..f5143ce 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java @@ -20,6 +20,8 @@ package org.apache.twill.yarn; import com.google.common.collect.ImmutableMap; import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.Configs; +import org.apache.twill.api.ResourceReport; import org.apache.twill.api.ResourceSpecification; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillController; @@ -33,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintWriter; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -65,12 +68,19 @@ public class ContainerSizeTestRun extends BaseYarnTest { public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException { TwillRunner runner = getTwillRunner(); TwillController controller = runner.prepare(new MaxHeapApp()) + // Alter the AM container size + .withConfiguration(Collections.singletonMap(Configs.Keys.YARN_AM_MEMORY_MB, "256")) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) .start(); try { ServiceDiscovered discovered = controller.discoverService("sleep"); Assert.assertTrue(waitForSize(discovered, 1, 120)); + + // Verify the AM container size + ResourceReport resourceReport = controller.getResourceReport(); + Assert.assertNotNull(resourceReport); + Assert.assertEquals(256, resourceReport.getAppMasterResources().getMemoryMB()); } finally { controller.terminate().get(120, TimeUnit.SECONDS); } http://git-wip-us.apache.org/repos/asf/twill/blob/e154bfed/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index 9daf06c..a141176 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -138,7 +138,7 @@ public class TwillTester extends ExternalResource { "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); conf.setBoolean("yarn.scheduler.include-port-in-node-name", true); } - conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1"); + conf.set("yarn.nodemanager.vmem-pmem-ratio", "100.1"); conf.set("yarn.nodemanager.vmem-check-enabled", "false"); conf.set("yarn.scheduler.minimum-allocation-mb", "128"); conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
