[
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461503#comment-16461503
]
ASF GitHub Bot commented on FLINK-8286:
---------------------------------------
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5896#discussion_r185606093
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
- run(args);
+ try {
+ SecurityUtils.getInstalledContext().runSecured(
+
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+ } catch (Exception e) {
+ LOG.error("Exception occurred while launching Task
Executor runner", e);
+ throw new RuntimeException(e);
+ }
}
/**
- * The instance entry point for the YARN task executor. Obtains user
group information and calls
- * the main work method {@link
TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a
- * privileged action.
+ * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
*
- * @param args The command line arguments.
+ * @param envs environment variables.
*/
- private static void run(String[] args) {
- try {
- LOG.debug("All environment variables: {}", ENV);
+ @VisibleForTesting
+ protected static Runner create(Map<String, String> envs) {
+ LOG.debug("All environment variables: {}", envs);
- final String yarnClientUsername =
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
- final String localDirs =
ENV.get(Environment.LOCAL_DIRS.key());
- LOG.info("Current working/local Directory: {}",
localDirs);
+ final String yarnClientUsername =
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+ LOG.info("Current working/local Directory: {}", localDirs);
- final String currDir = ENV.get(Environment.PWD.key());
- LOG.info("Current working Directory: {}", currDir);
+ final String currDir = envs.get(Environment.PWD.key());
+ LOG.info("Current working Directory: {}", currDir);
- final String remoteKeytabPath =
ENV.get(YarnConfigKeys.KEYTAB_PATH);
- LOG.info("TM: remote keytab path obtained {}",
remoteKeytabPath);
+ final String remoteKeytabPrincipal =
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("TM: remote keytab principal obtained {}",
remoteKeytabPrincipal);
- final String remoteKeytabPrincipal =
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
- LOG.info("TM: remote keytab principal obtained {}",
remoteKeytabPrincipal);
-
- final Configuration configuration =
GlobalConfiguration.loadConfiguration(currDir);
+ final Configuration configuration;
+ try {
+ configuration =
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ return null;
+ }
- // configure local directory
- if (configuration.contains(CoreOptions.TMP_DIRS)) {
- LOG.info("Overriding YARN's temporary file
directories with those " +
- "specified in the Flink config: " +
configuration.getValue(CoreOptions.TMP_DIRS));
- }
- else {
- LOG.info("Setting directories for temporary
files to: {}", localDirs);
- configuration.setString(CoreOptions.TMP_DIRS,
localDirs);
- }
-
- // tell akka to die in case of an error
-
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+ // configure local directory
+ if (configuration.contains(CoreOptions.TMP_DIRS)) {
+ LOG.info("Overriding YARN's temporary file directories
with those " +
+ "specified in the Flink config: " +
configuration.getValue(CoreOptions.TMP_DIRS));
+ }
+ else {
+ LOG.info("Setting directories for temporary files to:
{}", localDirs);
+ configuration.setString(CoreOptions.TMP_DIRS,
localDirs);
+ }
- String keytabPath = null;
- if (remoteKeytabPath != null) {
- File f = new File(currDir,
Utils.KEYTAB_FILE_NAME);
- keytabPath = f.getAbsolutePath();
- LOG.info("keytab path: {}", keytabPath);
- }
+ // tell akka to die in case of an error
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR,
true);
+ try {
UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
LOG.info("YARN daemon is running as: {} Yarn client
user obtainer: {}",
currentUser.getShortUserName(),
yarnClientUsername);
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
--- End diff --
@aljoscha done, please take another look. Thanks.
> Fix Flink-Yarn-Kerberos integration for FLIP-6
> ----------------------------------------------
>
> Key: FLINK-8286
> URL: https://issues.apache.org/jira/browse/FLINK-8286
> Project: Flink
> Issue Type: Bug
> Components: Security
> Reporter: Shuyi Chen
> Assignee: Shuyi Chen
> Priority: Blocker
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)