Repository: flink Updated Branches: refs/heads/master 303f6fee9 -> 68709b087
[FLINK-3929] additional fixes for keytab security - load flink-jaas.conf from classpath - avoid using undocumented flink base dir config entry - enable test cases to run on MacOS - unify suffix of secure test cases - fix error logging and reporting This closes #2275 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68709b08 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68709b08 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68709b08 Branch: refs/heads/master Commit: 68709b087570402cacb7bc3088e0eb35d83c8d32 Parents: 285b6f7 Author: Maximilian Michels <m...@apache.org> Authored: Tue Sep 20 15:41:35 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue Sep 20 22:03:29 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 3 - .../src/main/flink-bin/conf/flink-jaas.conf | 26 -------- .../flink/runtime/security/SecurityContext.java | 67 ++++++++------------ .../src/main/resources/flink-jaas.conf | 26 ++++++++ .../flink/runtime/taskmanager/TaskManager.scala | 2 - .../runtime/security/SecurityContextTest.java | 4 +- .../connectors/fs/RollingSinkSecuredITCase.java | 1 - .../kafka/Kafka09SecureRunITCase.java | 62 ------------------ .../kafka/Kafka09SecuredRunITCase.java | 62 ++++++++++++++++++ .../flink/test/util/SecureTestEnvironment.java | 23 +++---- .../org/apache/flink/yarn/YarnTestBase.java | 3 +- .../flink/yarn/YarnApplicationMasterRunner.java | 1 - .../flink/yarn/YarnTaskManagerRunner.java | 5 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 32 +++------- 14 files changed, 138 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 575ffad..0711758 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -161,9 +161,6 @@ public class CliFrontend { "filesystem scheme from configuration.", e); } - this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDirectory.getAbsolutePath() - + ".." + File.separator); - this.clientTimeout = AkkaUtils.getClientTimeout(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-dist/src/main/flink-bin/conf/flink-jaas.conf ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf deleted file mode 100644 index d476e24..0000000 --- a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf +++ /dev/null @@ -1,26 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -# We are using this file as an workaround for the Kafka and ZK SASL implementation -# since they explicitly look for java.security.auth.login.config property -# The file itself is not used by the application since the internal implementation -# uses a process-wide in-memory java security configuration object. -# Please do not edit/delete this file - See FLINK-3929 -sample { - useKeyTab=false - useTicketCache=true; -}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java index 4b8b69b..be6611f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java @@ -22,7 +22,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -33,7 +34,12 @@ import org.slf4j.LoggerFactory; import javax.security.auth.Subject; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.security.PrivilegedExceptionAction; import java.util.Collection; @@ -170,15 +176,12 @@ public class SecurityContext { * Kafka current code behavior. */ private static void populateSystemSecurityProperties(Configuration configuration) { + Preconditions.checkNotNull(configuration, "The supplied configuation was null"); //required to be empty for Kafka but we will override the property //with pseudo JAAS configuration file if SASL auth is enabled for ZK System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, ""); - if(configuration == null) { - return; - } - boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE); if(disableSaslClient) { @@ -188,46 +191,26 @@ public class SecurityContext { return; } - String baseDir = configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null); - if(baseDir == null) { - String message = "SASL auth is enabled for ZK but unable to locate pseudo Jaas config " + - "since " + ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided"; - LOG.error(message); - throw new IllegalConfigurationException(message); - } - - File f = new File(baseDir); - if(!f.exists() || !f.isDirectory()) { - LOG.error("Invalid flink base directory {} configuration provided", baseDir); - throw new IllegalConfigurationException("Invalid flink base directory configuration provided"); - } - - File jaasConfigFile = new File(f, JAAS_CONF_FILENAME); - - if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) { - - //check if there is a conf directory - File confDir = new File(f, "conf"); - if(!confDir.exists() || !confDir.isDirectory()) { - LOG.error("Could not locate " + JAAS_CONF_FILENAME); - throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME); - } - - jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME); - - if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) { - LOG.error("Could not locate " + JAAS_CONF_FILENAME); - throw new IllegalConfigurationException("Could not locate " + JAAS_CONF_FILENAME); - } + // load Jaas config file to initialize SASL + final File jaasConfFile; + try { + Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, ""); + InputStream jaasConfStream = SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME); + Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + jaasConfFile = jaasConfPath.toFile(); + jaasConfFile.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("SASL auth is enabled for ZK but unable to " + + "locate pseudo Jaas config provided with Flink", e); } LOG.info("Enabling {} property with pseudo JAAS config file: {}", - JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile); + JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); //ZK client module lookup the configuration to handle SASL. //https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 - System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfigFile.getAbsolutePath()); - System.setProperty(ZOOKEEPER_SASL_CLIENT,"true"); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); + System.setProperty(ZOOKEEPER_SASL_CLIENT, "true"); String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null); if(!StringUtils.isBlank(zkSaslServiceName)) { @@ -250,6 +233,10 @@ public class SecurityContext { String principal; + public SecurityConfiguration() { + this.flinkConf = GlobalConfiguration.loadConfiguration(); + } + public String getKeytab() { return keytab; } @@ -310,4 +297,4 @@ public class SecurityContext { T run() throws Exception; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/resources/flink-jaas.conf ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf new file mode 100644 index 0000000..7f0f06b --- /dev/null +++ b/flink-runtime/src/main/resources/flink-jaas.conf @@ -0,0 +1,26 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +# We are using this file as an workaround for the Kafka and ZK SASL implementation +# since they explicitly look for java.security.auth.login.config property +# The file itself is not used by the application since the internal implementation +# uses a process-wide in-memory java security configuration object. +# Please do not edit/delete this file - See FLINK-3929 +sample { + useKeyTab=false + useTicketCache=true; +} http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8534ee1..9e2feb5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1583,8 +1583,6 @@ object TaskManager { } } - conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, cliConfig.getConfigDir() + "/..") - conf } http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java index 5f3d76a..3c48e8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java @@ -35,7 +35,7 @@ public class SecurityContextTest { SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); try { SecurityContext.install(sc); - assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName()); + assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName()); } catch (Exception e) { fail(e.getMessage()); } @@ -59,7 +59,7 @@ public class SecurityContextTest { if( osName.contains( "windows" ) ){ className = "com.sun.security.auth.module.NTSystem"; } - else if( osName.contains( "linux" ) ){ + else if( osName.contains( "linux" ) || osName.contains( "mac" ) ){ className = "com.sun.security.auth.module.UnixSystem"; } else if( osName.contains( "solaris" ) || osName.contains( "sunos" ) ){ http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index 930ddd2..051175a 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -227,7 +227,6 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); } catch (Exception e) { - LOG.error("Exception occured while creating MiniFlink cluster. Reason: {}", e); throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java deleted file mode 100644 index d12ec65..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.test.util.SecureTestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/* - * Kafka Secure Connection (kerberos) IT test case - */ -public class Kafka09SecureRunITCase extends KafkaConsumerTestBase { - - protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecureRunITCase.class); - - @BeforeClass - public static void prepare() throws IOException, ClassNotFoundException { - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Starting Kafka09SecureRunITCase "); - LOG.info("-------------------------------------------------------------------------"); - - SecureTestEnvironment.prepare(tempFolder); - SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration()); - - startClusters(true); - } - - @AfterClass - public static void shutDownServices() { - shutdownClusters(); - SecureTestEnvironment.cleanup(); - } - - - //timeout interval is large since in Travis, ZK connection timeout occurs frequently - //The timeout for the test case is 2 times timeout of ZK connection - @Test(timeout = 600000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java new file mode 100644 index 0000000..e748537 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.test.util.SecureTestEnvironment; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/* + * Kafka Secure Connection (kerberos) IT test case + */ +public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class); + + @BeforeClass + public static void prepare() throws IOException, ClassNotFoundException { + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting Kafka09SecuredRunITCase "); + LOG.info("-------------------------------------------------------------------------"); + + SecureTestEnvironment.prepare(tempFolder); + SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration()); + + startClusters(true); + } + + @AfterClass + public static void shutDownServices() { + shutdownClusters(); + SecureTestEnvironment.cleanup(); + } + + + //timeout interval is large since in Travis, ZK connection timeout occurs frequently + //The timeout for the test case is 2 times timeout of ZK connection + @Test(timeout = 600000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java index 00b19f1..b5e622b 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java @@ -20,6 +20,7 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.security.SecurityContext; import org.apache.hadoop.minikdc.MiniKdc; import org.junit.rules.TemporaryFolder; @@ -60,12 +61,10 @@ public class SecureTestEnvironment { private static String hadoopServicePrincipal = null; - private static File baseDirForSecureRun = null; - public static void prepare(TemporaryFolder tempFolder) { try { - baseDirForSecureRun = tempFolder.newFolder(); + File baseDirForSecureRun = tempFolder.newFolder(); LOG.info("Base Directory for Secure Environment: {}", baseDirForSecureRun); String hostName = "localhost"; @@ -113,19 +112,17 @@ public class SecureTestEnvironment { //See Yarn test case module for reference createJaasConfig(baseDirForSecureRun); SecurityContext.SecurityConfiguration ctx = new SecurityContext.SecurityConfiguration(); - Configuration flinkConfig = new Configuration(); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab); flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal); flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false); - flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirForSecureRun.getAbsolutePath()); ctx.setFlinkConfiguration(flinkConfig); TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap()); - populateSystemEnvVariables(); + populateJavaPropertyVariables(); } catch(Exception e) { - LOG.error("Exception occured while preparing secure environment. Reason: {}", e); - throw new RuntimeException(e); + throw new RuntimeException("Exception occured while preparing secure environment.", e); } } @@ -145,14 +142,12 @@ public class SecureTestEnvironment { testPrincipal = null; testZkServerPrincipal = null; hadoopServicePrincipal = null; - baseDirForSecureRun = null; } - private static void populateSystemEnvVariables() { + private static void populateJavaPropertyVariables() { if(LOG.isDebugEnabled()) { - System.setProperty("FLINK_JAAS_DEBUG", "true"); System.setProperty("sun.security.krb5.debug", "true"); } @@ -165,7 +160,6 @@ public class SecureTestEnvironment { private static void resetSystemEnvVariables() { System.clearProperty("java.security.krb5.conf"); - System.clearProperty("FLINK_JAAS_DEBUG"); System.clearProperty("sun.security.krb5.debug"); System.clearProperty("zookeeper.authProvider.1"); @@ -227,7 +221,7 @@ public class SecureTestEnvironment { } /* - * Helper method to create a temporary JAAS configuration file to ger around the Kafka and ZK SASL + * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL * implementation lookup java.security.auth.login.config */ private static void createJaasConfig(File baseDirForSecureRun) { @@ -241,8 +235,7 @@ public class SecureTestEnvironment { out.println("useTicketCache=true;"); out.println("};"); } catch (IOException e) { - LOG.error("Exception occured while trying to create JAAS config. Reason: {}", e.getMessage()); - throw new RuntimeException(e); + throw new RuntimeException("Exception occured while trying to create JAAS config.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 605aa44..afdd400 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -427,8 +427,7 @@ public abstract class YarnTestBase extends TestLogger { out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal); out.println(""); } catch (IOException e) { - LOG.error("Exception occured while trying to append the security configurations. Reason: {}", e.getMessage()); - throw new RuntimeException(e); + throw new RuntimeException("Exception occured while trying to append the security configurations.", e); } String configDir = tempConfPathForSecureRun.getAbsolutePath(); http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index efb658a..b27876b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -176,7 +176,6 @@ public class YarnApplicationMasterRunner { flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); } - flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir); SecurityContext.install(sc.setFlinkConfiguration(flinkConfig)); http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index c70a30b..21ed52e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -127,7 +127,6 @@ public class YarnTaskManagerRunner { configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); } - configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir); SecurityContext.install(sc.setFlinkConfiguration(configuration)); @@ -145,9 +144,9 @@ public class YarnTaskManagerRunner { } }); } catch(Exception e) { - LOG.error("Exception occurred while launching Task Manager. Reason: {}", e); + LOG.error("Exception occurred while launching Task Manager", e); throw new RuntimeException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/68709b08/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index b5364f0..d09340c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -24,7 +24,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; @@ -463,27 +462,17 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } } - public static void main(final String[] args) { - final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session - - String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(confDirPath); + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); - flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath); - try { - SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration)); - int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { - @Override - public Integer run() { - return cli.run(args); - } - }); - System.exit(retCode); - } catch(Exception e) { - e.printStackTrace(); - LOG.error("Exception Occured. Reason: {}", e); - return; - } + SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration)); + int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + return cli.run(args); + } + }); + System.exit(retCode); } @Override @@ -544,7 +533,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> try { return yarnClusterDescriptor.deploy(); } catch (Exception e) { - LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e); throw new RuntimeException("Error deploying the YARN cluster", e); }