Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4636#discussion_r137213174 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java --- @@ -112,29 +114,29 @@ public static void runYarnTaskManager(String[] args, final Class<? extends YarnT try { - org.apache.hadoop.conf.Configuration hadoopConfiguration = null; + SecurityUtils.SecurityConfiguration sc; //To support Yarn Secure Integration Test Scenario File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); if (krb5Conf.exists() && krb5Conf.canRead()) { String krb5Path = krb5Conf.getAbsolutePath(); LOG.info("KRB5 Conf: {}", krb5Path); - hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); + org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - } - // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager - if (localKeytabPath != null && remoteKeytabPrincipal != null) { - configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); - } + // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager + if (localKeytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); + } + + sc = new SecurityUtils.SecurityConfiguration(configuration, + Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration))); - SecurityUtils.SecurityConfiguration sc; - if (hadoopConfiguration != null) { - sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); } else { sc = new SecurityUtils.SecurityConfiguration(configuration); + --- End diff -- fixing
---