Repository: reef Updated Branches: refs/heads/master e6c23d53f -> c5322db90
[REEF-1776] Create YARN proxy user to register REEF Driver in Unmanaged AM mode Summary of changes: * Create `YarnProxyUser` class to encapsulate the credentials copying and execution in the proxy context * Implement generic API for such user as `UserCredentials` interface to minimize the exposure at the REEF client API side * Add `YarnProxyUser` implementation to proper configuration modules * Pass `UserCredentials` from REEF into the Driver context in `ReefOnReefDriver` * Minor refactoring and logging improvements JIRA: [REEF-1776](https://issues.apache.org/jira/browse/REEF-1776) Pull request: That closes #1290 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c5322db9 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c5322db9 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c5322db9 Branch: refs/heads/master Commit: c5322db90db7430455c70623b6cfe063a2ae6603 Parents: e6c23d5 Author: Sergiy Matusevych <[email protected]> Authored: Tue Apr 4 19:46:50 2017 -0700 Committer: Julia Wang <[email protected]> Committed: Mon Apr 17 14:53:04 2017 -0700 ---------------------------------------------------------------------- .../bridge/client/YarnJobSubmissionClient.java | 7 +- .../org/apache/reef/client/DriverLauncher.java | 9 +- .../reef/runtime/common/REEFEnvironment.java | 27 +++- .../reef/runtime/common/UserCredentials.java | 64 +++++++++ .../client/defaults/DefaultUserCredentials.java | 75 ++++++++++ .../examples/reefonreef/ReefOnReefDriver.java | 6 +- .../yarn/client/YarnClientConfiguration.java | 7 +- .../yarn/client/YarnJobSubmissionHandler.java | 8 +- .../yarn/client/YarnSubmissionHelper.java | 9 +- .../UnmanagedAmYarnClientConfiguration.java | 4 +- .../UnmanagedAmYarnDriverConfiguration.java | 2 + .../UnmanagedAmYarnJobSubmissionHandler.java | 3 +- .../UnmanagedAmYarnSubmissionHelper.java | 8 +- .../yarn/client/unmanaged/YarnProxyUser.java | 139 +++++++++++++++++++ .../yarn/driver/YarnContainerManager.java | 35 +++-- 15 files changed, 381 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index fde89cb..6455f75 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -37,6 +37,7 @@ import org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefi import org.apache.reef.runtime.yarn.YarnClasspathProvider; import org.apache.reef.runtime.yarn.client.SecurityTokenProvider; import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; +import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; @@ -71,6 +72,7 @@ public final class YarnJobSubmissionClient { private final REEFFileNames fileNames; private final YarnConfiguration yarnConfiguration; private final ClasspathProvider classpath; + private final YarnProxyUser yarnProxyUser; private final SecurityTokenProvider tokenProvider; private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator; @@ -81,6 +83,7 @@ public final class YarnJobSubmissionClient { final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, final ClasspathProvider classpath, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider, final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) { @@ -90,6 +93,7 @@ public final class YarnJobSubmissionClient { this.fileNames = fileNames; this.yarnConfiguration = yarnConfiguration; this.classpath = classpath; + this.yarnProxyUser = yarnProxyUser; this.tokenProvider = tokenProvider; this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator; } @@ -115,7 +119,8 @@ public final class YarnJobSubmissionClient { // ------------------------------------------------------------------------ // Get an application ID try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper( - yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, commandPrefixList)) { + this.yarnConfiguration, this.fileNames, this.classpath, this.yarnProxyUser, + this.tokenProvider, this.isUnmanaged, this.commandPrefixList)) { // ------------------------------------------------------------------------ // Prepare the JAR http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java index 78ca13a..bc8d95f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverLauncher.java @@ -21,6 +21,7 @@ package org.apache.reef.client; import org.apache.reef.annotations.Provided; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; +import org.apache.reef.runtime.common.UserCredentials; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Unit; @@ -59,6 +60,7 @@ public final class DriverLauncher implements AutoCloseable { .build(); private final REEF reef; + private final UserCredentials user; private LauncherStatus status = LauncherStatus.INIT; @@ -66,8 +68,13 @@ public final class DriverLauncher implements AutoCloseable { private RunningJob theJob; @Inject - private DriverLauncher(final REEF reef) { + private DriverLauncher(final REEF reef, final UserCredentials user) { this.reef = reef; + this.user = user; + } + + public UserCredentials getUser() { + return this.user; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java index 3d3ae67..3a1429a 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java @@ -33,6 +33,7 @@ import org.apache.reef.wake.profiler.WakeProfiler; import org.apache.reef.wake.profiler.ProfilerState; import org.apache.reef.wake.time.Clock; +import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,8 +65,22 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { * Main part of the configuration is usually read from config file by REEFLauncher. * @throws InjectionException Thrown on configuration error. */ - @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler public static REEFEnvironment fromConfiguration(final Configuration... configurations) throws InjectionException { + return fromConfiguration(null, configurations); + } + + /** + * Create a new REEF environment. + * @param hostUser User credentials to use when registering REEF app with the Resource Manager. + * This parameter may be required for Unmanaged AM mode. Can be null. + * @param configurations REEF component (Driver or Evaluator) configuration. + * If multiple configurations are provided, they will be merged before use. + * Main part of the configuration is usually read from config file by REEFLauncher. + * @throws InjectionException Thrown on configuration error. + */ + @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler + public static REEFEnvironment fromConfiguration( + final UserCredentials hostUser, final Configuration... configurations) throws InjectionException { final Configuration config = Configurations.merge(configurations); @@ -86,6 +101,16 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class); final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class); + if (hostUser != null) { + try { + injector.getInstance(UserCredentials.class).set("reef-proxy", hostUser); + } catch (final IOException ex) { + final String msg = "Cannot copy user credentials: " + hostUser; + LOG.log(Level.SEVERE, msg, ex); + throw new RuntimeException(msg, ex); + } + } + try { final Clock clock = injector.getInstance(Clock.class); http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java new file mode 100644 index 0000000..6bac1ef --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/UserCredentials.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.runtime.common.client.defaults.DefaultUserCredentials; +import org.apache.reef.tang.annotations.DefaultImplementation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * A holder object for REEF user credentials. + * Implementations of this interface are used e.g. for Unmanaged AM applications on YARN. + */ +@Public +@ClientSide +@DriverSide +@DefaultImplementation(DefaultUserCredentials.class) +public interface UserCredentials { + + /** + * Copy credentials from another existing user. + * This method can be called only once per instance. + * @param name name of the new user. + * @param other Credentials of another user. + * @throws IOException if unable to copy. + */ + void set(final String name, final UserCredentials other) throws IOException; + + /** + * Check if the user credentials had been set. + * @return true if set() method had been called successfully before, false otherwise. + */ + boolean isSet(); + + /** + * Execute the privileged action as a given user. + * If user credentials are not set, execute the action outside the user context. + * @param action an action to run. + * @param <T> action return type. + * @return result of an action. + * @throws Exception whatever the action can throw. + */ + <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception; +} http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java new file mode 100644 index 0000000..216ce97 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/defaults/DefaultUserCredentials.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.client.defaults; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.UserCredentials; + +import javax.inject.Inject; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * A holder object for REEF user credentials. + */ +@Private +@ClientSide +@DriverSide +public final class DefaultUserCredentials implements UserCredentials { + + @Inject + private DefaultUserCredentials() { } + + /** + * Copy credentials from another existing user. + * This method can be called only once per instance. + * This default implementation should never be called. + * @param name Name of the new user. + * @param other Credentials of another user. + * @throws RuntimeException always throws. + */ + @Override + public void set(final String name, final UserCredentials other) throws IOException { + throw new RuntimeException("Not implemented! Attempt to set user " + name + " from: " + other); + } + + /** + * Check if the user credentials had been set. Always returns false. + * @return always false. + */ + @Override + public boolean isSet() { + return false; + } + + /** + * Execute the privileged action as a given user. + * This implementation always executes the action outside the user context, simply by calling action.run(). + * @param action an action to run. + * @param <T> action return type. + * @return result of an action. + * @throws Exception whatever the action can throw. + */ + @Override + public <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception { + return action.run(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java index ae04436..a509399 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/reefonreef/ReefOnReefDriver.java @@ -85,9 +85,13 @@ final class ReefOnReefDriver implements EventHandler<StartTime> { .set(UnmanagedAmYarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, DRIVER_ROOT_PATH) .build(); - try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(yarnAmConfig, DRIVER_CONFIG)) { + try (final REEFEnvironment reef = + REEFEnvironment.fromConfiguration(client.getUser(), yarnAmConfig, DRIVER_CONFIG)) { + reef.run(); + final ReefServiceProtos.JobStatusProto status = reef.getLastStatus(); + LOG.log(Level.INFO, "REEF-on-REEF inner job {0} completed: state {1}", new Object[] {innerApplicationId, status.getState()}); } http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java index 0e6733b..b1fe4f9 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.driver.parameters.DriverIsUnmanaged; +import org.apache.reef.runtime.common.UserCredentials; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; import org.apache.reef.runtime.common.client.DriverConfigurationProvider; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; @@ -30,6 +31,7 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.yarn.YarnClasspathProvider; import org.apache.reef.runtime.yarn.client.parameters.JobPriority; import org.apache.reef.runtime.yarn.client.parameters.JobQueue; +import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.formats.*; @@ -58,15 +60,16 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder { public static final ConfigurationModule CONF = new YarnClientConfiguration() .merge(CommonRuntimeConfiguration.CONF) - // Bind YARN + // Bind YARN-specific classes .bindImplementation(JobSubmissionHandler.class, YarnJobSubmissionHandler.class) .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindImplementation(UserCredentials.class, YarnProxyUser.class) // Bind the parameters given by the user .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME) .bindNamedParameter(JobPriority.class, YARN_PRIORITY) .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) .bindNamedParameter(DriverIsUnmanaged.class, UNMANAGED_DRIVER) - .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index 9457f90..c79c668 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -34,6 +34,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.JobJarMaker; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.yarn.client.parameters.JobQueue; +import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.tang.Configuration; @@ -61,6 +62,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { private final REEFFileNames fileNames; private final ClasspathProvider classpath; private final JobUploader uploader; + private final YarnProxyUser yarnProxyUser; private final SecurityTokenProvider tokenProvider; private final DriverConfigurationProvider driverConfigurationProvider; @@ -75,6 +77,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { final REEFFileNames fileNames, final ClasspathProvider classpath, final JobUploader uploader, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider, final DriverConfigurationProvider driverConfigurationProvider) throws IOException { @@ -85,6 +88,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { this.fileNames = fileNames; this.classpath = classpath; this.uploader = uploader; + this.yarnProxyUser = yarnProxyUser; this.tokenProvider = tokenProvider; this.driverConfigurationProvider = driverConfigurationProvider; } @@ -100,8 +104,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { LOG.log(Level.FINEST, "Submitting{0} job: {1}", new Object[] {this.isUnmanaged ? " UNMANAGED AM" : "", jobSubmissionEvent}); - try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper( - this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) { + try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(this.yarnConfiguration, + this.fileNames, this.classpath, this.yarnProxyUser, this.tokenProvider, this.isUnmanaged)) { LOG.log(Level.FINE, "Assembling submission JAR for the Driver."); final Optional<String> userBoundJobSubmissionDirectory = http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index 6df3ffc..019114d 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -19,6 +19,7 @@ package org.apache.reef.runtime.yarn.client; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; @@ -32,6 +33,7 @@ import org.apache.reef.runtime.common.REEFLauncher; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; +import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.util.YarnTypes; import java.io.IOException; @@ -52,6 +54,7 @@ public final class YarnSubmissionHelper implements AutoCloseable { private final ApplicationId applicationId; private final Map<String, LocalResource> resources = new HashMap<>(); private final ClasspathProvider classpath; + private final YarnProxyUser yarnProxyUser; private final SecurityTokenProvider tokenProvider; private final boolean isUnmanaged; private final List<String> commandPrefixList; @@ -64,11 +67,13 @@ public final class YarnSubmissionHelper implements AutoCloseable { public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, final ClasspathProvider classpath, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider, final boolean isUnmanaged, final List<String> commandPrefixList) throws IOException, YarnException { this.classpath = classpath; + this.yarnProxyUser = yarnProxyUser; this.isUnmanaged = isUnmanaged; this.driverStdoutFilePath = @@ -98,9 +103,10 @@ public final class YarnSubmissionHelper implements AutoCloseable { public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration, final REEFFileNames fileNames, final ClasspathProvider classpath, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider, final boolean isUnmanaged) throws IOException, YarnException { - this(yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, null); + this(yarnConfiguration, fileNames, classpath, yarnProxyUser, tokenProvider, isUnmanaged, null); } /** @@ -287,6 +293,7 @@ public final class YarnSubmissionHelper implements AutoCloseable { // For Unmanaged AM mode, add a new app token to the // current process so it can talk to the RM as an AM. final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId); + this.yarnProxyUser.set("reef-proxy", UserGroupInformation.getCurrentUser(), token); this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java index 7862ad9..bb32120 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.driver.parameters.DriverIsUnmanaged; +import org.apache.reef.runtime.common.UserCredentials; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; import org.apache.reef.runtime.common.client.DriverConfigurationProvider; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; @@ -60,11 +61,12 @@ public class UnmanagedAmYarnClientConfiguration extends ConfigurationModuleBuild // Bind YARN .bindImplementation(JobSubmissionHandler.class, UnmanagedAmYarnJobSubmissionHandler.class) .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindImplementation(UserCredentials.class, YarnProxyUser.class) // Bind the parameters given by the user .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME) .bindNamedParameter(JobPriority.class, YARN_PRIORITY) .bindNamedParameter(RootFolder.class, ROOT_FOLDER) - .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java index 2a57a97..c497d17 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java @@ -24,6 +24,7 @@ import org.apache.reef.annotations.audience.Public; import org.apache.reef.driver.parameters.DriverIsUnmanaged; import org.apache.reef.io.TempFileCreator; import org.apache.reef.io.WorkingDirectoryTempFileCreator; +import org.apache.reef.runtime.common.UserCredentials; import org.apache.reef.runtime.common.driver.api.*; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; @@ -85,6 +86,7 @@ public final class UnmanagedAmYarnDriverConfiguration extends ConfigurationModul .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) .bindImplementation(RackNameFormatter.class, RACK_NAME_FORMATTER) .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindImplementation(UserCredentials.class, YarnProxyUser.class) .bindNamedParameter(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME) .build(); http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java index d75a2b5..73e2cd1 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java @@ -52,13 +52,14 @@ final class UnmanagedAmYarnJobSubmissionHandler implements JobSubmissionHandler @Parameter(JobQueue.class) final String defaultQueueName, final UnmanagedDriverFiles driverFiles, final YarnConfiguration yarnConfiguration, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider) throws IOException { this.defaultQueueName = defaultQueueName; this.driverFiles = driverFiles; try { - this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, tokenProvider); + this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, yarnProxyUser, tokenProvider); } catch (final IOException | YarnException ex) { LOG.log(Level.SEVERE, "Cannot create YARN client", ex); throw new RuntimeException("Cannot create YARN client", ex); http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java index d25dbad..9142194 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java @@ -18,6 +18,7 @@ */ package org.apache.reef.runtime.yarn.client.unmanaged; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -42,14 +43,18 @@ final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable { private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnSubmissionHelper.class.getName()); private final SecurityTokenProvider tokenProvider; + private final YarnProxyUser yarnProxyUser; private final YarnClient yarnClient; private final ApplicationSubmissionContext applicationSubmissionContext; private final ApplicationId applicationId; - UnmanagedAmYarnSubmissionHelper(final YarnConfiguration yarnConfiguration, + UnmanagedAmYarnSubmissionHelper( + final YarnConfiguration yarnConfiguration, + final YarnProxyUser yarnProxyUser, final SecurityTokenProvider tokenProvider) throws IOException, YarnException { this.tokenProvider = tokenProvider; + this.yarnProxyUser = yarnProxyUser; LOG.log(Level.FINE, "Initializing YARN Client"); this.yarnClient = YarnClient.createYarnClient(); @@ -116,6 +121,7 @@ final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable { this.yarnClient.submitApplication(this.applicationSubmissionContext); final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId); + this.yarnProxyUser.set("reef-uam-proxy", UserGroupInformation.getCurrentUser(), token); this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token)); } http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java new file mode 100644 index 0000000..1f653e2 --- /dev/null +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/YarnProxyUser.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.yarn.client.unmanaged; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.UserCredentials; + +import javax.inject.Inject; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A holder class for the proxy UserGroupInformation object + * required for Unmanaged YARN Application Master in REEF-on-REEF or REEF-on-Spark mode. + */ +@Private +@ClientSide +@DriverSide +public final class YarnProxyUser implements UserCredentials { + + private static final Logger LOG = Logger.getLogger(YarnProxyUser.class.getName()); + + private UserGroupInformation proxyUGI = null; + + @Inject + private YarnProxyUser() { } + + /** + * Get the YARN proxy user information. If not set, return the (global) current user. + * @return Proxy user group information, if set; otherwise, return current YARN user. + * @throws IOException if proxy user is not set AND unable to obtain current YARN user information. + */ + public UserGroupInformation get() throws IOException { + + final UserGroupInformation effectiveUGI = + this.proxyUGI == null ? UserGroupInformation.getCurrentUser() : this.proxyUGI; + + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "UGI: get: {0}", ugiToString("EFFECTIVE", effectiveUGI)); + } + + return effectiveUGI; + } + + /** + * Check if the proxy user is set. + * @return true if proxy user set, false otherwise. + */ + @Override + public boolean isSet() { + return this.proxyUGI != null; + } + + /** + * Set YARN user. This method can be called only once per class instance. + * @param name Name of the new proxy user. + * @param hostUser User credentials to copy. Must be an instance of YarnProxyUser. + */ + @Override + public void set(final String name, final UserCredentials hostUser) throws IOException { + + assert this.proxyUGI == null; + assert hostUser instanceof YarnProxyUser; + + LOG.log(Level.FINE, "UGI: user {0} copy from: {1}", new Object[] {name, hostUser}); + + final UserGroupInformation hostUGI = ((YarnProxyUser) hostUser).get(); + final Collection<Token<? extends TokenIdentifier>> tokens = hostUGI.getCredentials().getAllTokens(); + + this.set(name, hostUGI, tokens.toArray(new Token[tokens.size()])); + } + + /** + * Create YARN proxy user and add security tokens to its credentials. + * This method can be called only once per class instance. + * @param proxyName Name of the new proxy user. + * @param hostUGI YARN user to impersonate the proxy. + * @param tokens Security tokens to add to the new proxy user's credentials. + */ + @SafeVarargs + public final void set(final String proxyName, + final UserGroupInformation hostUGI, final Token<? extends TokenIdentifier>... tokens) { + + assert this.proxyUGI == null; + this.proxyUGI = UserGroupInformation.createProxyUser(proxyName, hostUGI); + + for (final Token<? extends TokenIdentifier> token : tokens) { + this.proxyUGI.addToken(token); + } + + LOG.log(Level.FINE, "UGI: user {0} set to: {1}", new Object[] {proxyName, this}); + } + + /** + * Execute the privileged action as a given user. + * If user credentials are not set, execute the action outside the user context. + * @param action an action to run. + * @param <T> action return type. + * @return result of an action. + * @throws Exception whatever the action can throw. + */ + public <T> T doAs(final PrivilegedExceptionAction<T> action) throws Exception { + LOG.log(Level.FINE, "{0} execute {1}", new Object[] {this, action}); + return this.proxyUGI == null ? action.run() : this.proxyUGI.doAs(action); + } + + @Override + public String toString() { + return this.proxyUGI == null ? "UGI: { CURRENT user: null }" : ugiToString("PROXY", this.proxyUGI); + } + + private static String ugiToString(final String prefix, final UserGroupInformation ugi) { + return String.format("UGI: { %s user: %s tokens: %s }", prefix, ugi, ugi.getCredentials().getAllTokens()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/c5322db9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index f323018..58df83a 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -21,6 +21,7 @@ package org.apache.reef.runtime.yarn.driver; import com.google.protobuf.ByteString; import org.apache.commons.collections.ListUtils; import org.apache.commons.lang3.exception.ExceptionUtils; + import org.apache.hadoop.fs.*; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.*; @@ -29,7 +30,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.ProgressProvider; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; @@ -39,6 +42,7 @@ import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.yarn.client.unmanaged.YarnProxyUser; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.tang.InjectionFuture; @@ -49,14 +53,16 @@ import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import javax.inject.Inject; import java.io.*; import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; -final class YarnContainerManager - implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { +@Private +@DriverSide +final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { private static final Logger LOG = Logger.getLogger(YarnContainerManager.class.getName()); @@ -74,6 +80,7 @@ final class YarnContainerManager private final YarnConfiguration yarnConf; private final AMRMClientAsync<AMRMClient.ContainerRequest> resourceManager; + private final YarnProxyUser yarnProxyUser; private final NMClientAsync nodeManager; private final REEFEventHandlers reefEventHandlers; private final Containers containers; @@ -92,6 +99,7 @@ final class YarnContainerManager @Parameter(YarnHeartbeatPeriod.class) final int yarnRMHeartbeatPeriod, @Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory, final YarnConfiguration yarnConf, + final YarnProxyUser yarnProxyUser, final REEFEventHandlers reefEventHandlers, final Containers containers, final ApplicationMasterRegistration registration, @@ -109,6 +117,7 @@ final class YarnContainerManager this.registration = registration; this.containerRequestCounter = containerRequestCounter; this.yarnConf = yarnConf; + this.yarnProxyUser = yarnProxyUser; this.rackNameFormatter = rackNameFormatter; this.trackingUrl = trackingURLProvider.getTrackingUrl(); @@ -119,7 +128,8 @@ final class YarnContainerManager this.reefFileNames = reefFileNames; this.progressProvider = progressProvider; - LOG.log(Level.FINEST, "Instantiated YarnContainerManager: {0}", this.registration); + LOG.log(Level.FINEST, "Instantiated YarnContainerManager: {0} {1}", + new Object[] {this.registration, this.yarnProxyUser}); } /** @@ -308,16 +318,21 @@ final class YarnContainerManager LOG.log(Level.FINEST, "YARN registration: begin"); - this.resourceManager.init(this.yarnConf); - this.resourceManager.start(); - this.nodeManager.init(this.yarnConf); this.nodeManager.start(); - LOG.log(Level.FINEST, "YARN registration: registered with RM and NM"); - try { + this.yarnProxyUser.doAs( + new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + resourceManager.init(yarnConf); + resourceManager.start(); + return null; + } + }); + LOG.log(Level.FINE, "YARN registration: register AM at \"{0}:{1}\" tracking URL \"{2}\"", new Object[] {AM_REGISTRATION_HOST, AM_REGISTRATION_PORT, this.trackingUrl}); @@ -333,7 +348,7 @@ final class YarnContainerManager out.writeBytes(this.trackingUrl + '\n'); } - } catch (final YarnException | IOException e) { + } catch (final Exception e) { LOG.log(Level.WARNING, "Unable to register application master.", e); onRuntimeError(e); }
