[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503255#comment-15503255
 ] 

ASF GitHub Bot commented on FLINK-3929:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2275#discussion_r79355653
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
    @@ -75,34 +85,66 @@ public static void runYarnTaskManager(String[] args, 
final Class<? extends YarnT
                                "specified in the Flink config: " + 
flinkTempDirs);
                }
     
    -           LOG.info("YARN daemon runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName() +
    -                   "' setting user to execute Flink TaskManager to '" + 
yarnClientUsername + "'");
    -
                // tell akka to die in case of an error
                
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
     
    -           UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
    -           for (Token<? extends TokenIdentifier> toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
    -                   ugi.addToken(toks);
    +           String keytabPath = null;
    +           if(remoteKeytabPath != null) {
    +                   File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    +                   keytabPath = f.getAbsolutePath();
    +                   LOG.info("keytabPath: {}", keytabPath);
                }
     
    +           UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
    +
    +           LOG.info("YARN daemon is running as: {} Yarn client user 
obtainer: {}",
    +                           currentUser.getShortUserName(), 
yarnClientUsername );
    +
                // Infer the resource identifier from the environment variable
                String containerID = 
Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
                final ResourceID resourceId = new ResourceID(containerID);
                LOG.info("ResourceID assigned for this container: {}", 
resourceId);
     
    -           ugi.doAs(new PrivilegedAction<Object>() {
    -                   @Override
    -                   public Object run() {
    -                           try {
    -                                   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
    -                           }
    -                           catch (Throwable t) {
    -                                   LOG.error("Error while starting the 
TaskManager", t);
    -                                   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
    -                           }
    -                           return null;
    +           try {
    +
    +                   SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
    +
    +                   //To support Yarn Secure Integration Test Scenario
    +                   File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
    +                   if(krb5Conf.exists() && krb5Conf.canRead()) {
    +                           String krb5Path = krb5Conf.getAbsolutePath();
    +                           LOG.info("KRB5 Conf: {}", krb5Path);
    +                           org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
    +                           
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
    +                           
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
    +                           sc.setHadoopConfiguration(conf);
    +                   }
    +
    +                   if(keytabPath != null && remoteKeytabPrincipal != null) 
{
    +                           
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
    +                           
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
    -           });
    +                   
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
    +
    +                   
SecurityContext.install(sc.setFlinkConfiguration(configuration));
    +
    +                   SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
    +                           @Override
    +                           public Integer run() {
    +                                   try {
    +                                           
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
    +                                   }
    +                                   catch (Throwable t) {
    +                                           LOG.error("Error while starting 
the TaskManager", t);
    +                                           
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
    +                                   }
    +                                   return null;
    +                           }
    +                   });
    +           } catch(Exception e) {
    +                   LOG.error("Exception occurred while launching Task 
Manager. Reason: {}", e);
    --- End diff --
    
    The signature is error(String msg, Throwable t). You can remove the 
"Reason: {}".


> Support for Kerberos Authentication with Keytab Credential
> ----------------------------------------------------------
>
>                 Key: FLINK-3929
>                 URL: https://issues.apache.org/jira/browse/FLINK-3929
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Eron Wright 
>            Assignee: Vijay Srinivasaraghavan
>              Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to