[FLINK-5364] [security] Rework JAAS configuration to support user-supplied entries
Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055 This closes #3057 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00193f7e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00193f7e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00193f7e Branch: refs/heads/release-1.2 Commit: 00193f7e238340cc18c57a44c7e6377432839373 Parents: 1750b0d Author: wrighe3 <[email protected]> Authored: Tue Dec 20 01:07:38 2016 -0800 Committer: Stephan Ewen <[email protected]> Committed: Wed Jan 11 19:06:10 2017 +0100 ---------------------------------------------------------------------- docs/internals/flink_security.md | 133 +++++--- docs/setup/config.md | 55 +++- .../connectors/fs/RollingSinkSecuredITCase.java | 8 +- .../flink/configuration/ConfigConstants.java | 14 - .../configuration/HighAvailabilityOptions.java | 8 - .../flink/configuration/SecurityOptions.java | 62 ++++ flink-dist/src/main/resources/flink-conf.yaml | 21 +- .../java/hadoop/mapred/utils/HadoopUtils.java | 21 ++ .../MesosApplicationMasterRunner.java | 2 - .../MesosTaskManagerRunner.java | 2 - .../overlays/KeytabOverlay.java | 14 +- .../runtime/security/DynamicConfiguration.java | 111 +++++++ .../runtime/security/JaasConfiguration.java | 160 --------- .../flink/runtime/security/KerberosUtils.java | 125 +++++++ .../flink/runtime/security/SecurityUtils.java | 322 ++++++++----------- .../runtime/security/modules/HadoopModule.java | 119 +++++++ .../runtime/security/modules/JaasModule.java | 146 +++++++++ .../security/modules/SecurityModule.java | 59 ++++ .../security/modules/ZooKeeperModule.java | 76 +++++ .../src/main/resources/flink-jaas.conf | 9 +- .../overlays/KeytabOverlayTest.java | 5 +- .../runtime/security/JaasConfigurationTest.java | 52 --- .../runtime/security/KerberosUtilsTest.java | 48 +++ .../runtime/security/SecurityUtilsTest.java | 105 +++--- .../flink/test/util/SecureTestEnvironment.java | 48 +-- .../test/util/TestingJaasConfiguration.java | 106 ------ .../flink/test/util/TestingSecurityContext.java | 38 +-- .../yarn/YARNSessionFIFOSecuredITCase.java | 10 +- .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../yarn/AbstractYarnClusterDescriptor.java | 26 +- .../flink/yarn/YarnApplicationMasterRunner.java | 25 +- .../flink/yarn/YarnTaskManagerRunner.java | 21 +- 32 files changed, 1182 insertions(+), 774 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/internals/flink_security.md ---------------------------------------------------------------------- diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md index 846273b..a83f3b9 100644 --- a/docs/internals/flink_security.md +++ b/docs/internals/flink_security.md @@ -24,64 +24,123 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. ## Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab +for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN +or Mesos environment. ## How Flink Security works -Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. -A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security -requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period -of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. +In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort, +Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication: -- Kafka (0.9) +- Kafka (0.9+) - HDFS +- HBase - ZooKeeper -Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation -(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context. +Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable +Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of +Kerbreros credentials, which is then explicitly used by each component. + +The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which +are installed at startup. The next section describes each security module. + +### Hadoop Security Module +This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is +then used for all interactions with Hadoop, including HDFS, HBase, and YARN. + +If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise, +the login user conveys only the user identity of the OS account that launched the cluster. + +### JAAS Security Module +This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper, +Kafka, and other such components that rely on JAAS. + +Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any +dynamic entries provided by this module. + +### ZooKeeper Security Module +This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`) +and the JAAS login context name (default: `Client`). + +## Security Configuration + +### Flink Configuration +The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option: -Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login -module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is -instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism. +- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`). -It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled -then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster. +A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file: -## Security Configurations +- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials. -Secure credentials can be supplied by adding below configuration elements to Flink configuration file: +- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab. -- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret. +These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option: -- `security.principal`: User principal name that the Flink cluster should run as. +- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication). -The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach. +ZooKeeper-related configuration overrides: -## Standalone Mode: +- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server. + +- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match +one of the values specified in `security.kerberos.login.contexts`. + +### Hadoop Configuration + +The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled. + +Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop +delegation tokens (for HDFS and for HBase). + +## Deployment Modes +Here is some information specific to each deployment mode. + +### Standalone Mode Steps to run a secure Flink cluster in standalone/cluster mode: -- Add security configurations to Flink configuration file (on all cluster nodes) -- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes -- Deploy Flink cluster using cluster start/stop scripts or CLI +1. Add security-related configuration options to the Flink configuration file (on all cluster nodes). +2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes. +3. Deploy Flink cluster as normal. + +### YARN/Mesos Mode + +Steps to run a secure Flink cluster in YARN/Mesos mode: +1. Add security-related configuration options to the Flink configuration file on the client. +2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node. +3. Deploy Flink cluster as normal. + +In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers. -## Yarn Mode: +For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation. -Steps to run secure Flink cluster in Yarn mode: -- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) -- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration -- Deploy Flink cluster using CLI +#### Using `kinit` (YARN only) -In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file. -Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> +In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`). +This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is +that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week). -## Token Renewal +Steps to run a secure Flink cluster using `kinit`: +1. Add security-related configuration options to the Flink configuration file on the client. +2. Login using the `kinit` command. +3. Deploy Flink cluster as normal. -UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink. \ No newline at end of file +## Further Details +### Ticket Renewal +Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT). +Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario, +YARN itself renews the token (up to its maximum lifespan). http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index 512c08a..3ef4ffd 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag - `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true. -### Kerberos +### Kerberos-based Security -Flink supports Kerberos authentication for the following services +Flink supports Kerberos authentication for the following services: -+ Hadoop Components: such as HDFS, YARN, or HBase. ++ Hadoop Components (such as HDFS, YARN, or HBase) + Kafka Connectors (version 0.9+) -+ Zookeeper Server/Client ++ Zookeeper -Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation. - -**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All +**Kerberos is supported only in Hadoop version 2.6.1 and above. All other versions have critical bugs which might fail the Flink job unexpectedly.** -**Ticket cache** and **Keytab** modes are supported for all above mentioned services. +Configuring Flink for Kerberos security involves three aspects: + +1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`) +2. Making the Kerberos credential available to components and connectors as needed +3. Configuring the component and/or connector to use Kerberos authentication + +To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options, +or login using `kinit` before starting the cluster. + +It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache, +talk to your administrator about increasing the Hadoop delegation token lifetime. + +- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`). + +- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials. -> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs) +- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab. -While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span. +If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components. -While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs. +Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts. -If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool. +- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication). + +You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option. + +Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication. For the Kafka connector, +use the following properties: + +``` +security.protocol=SASL_PLAINTEXT (or SASL_SSL) +sasl.kerberos.service.name=kafka +``` -> Keytab (security principal and keytab can be configured through Flink configuration file) -- `security.keytab`: Path to Keytab file -- `security.principal`: Principal associated with the keytab +Flink provides some additional options to configure ZooKeeper security: -Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time. +- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). -For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers. +- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match +one of the values specified in `security.kerberos.login.contexts`. ### Other http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index eb12d07..fa46fc7 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.SecureTestEnvironment; @@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { populateSecureConfigurations(); Configuration flinkConfig = new Configuration(); - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, SecureTestEnvironment.getTestKeytab()); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, SecureTestEnvironment.getHadoopServicePrincipal()); - SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig); - ctx.setHadoopConfiguration(conf); + SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf); try { TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index eabb754..fc389e0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1395,20 +1395,6 @@ public final class ConfigConstants { /** The environment variable name which contains the Flink installation root directory */ public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; - // -------------------------------- Security ------------------------------- - - /** - * The config parameter defining security credentials required - * for securing Flink cluster. - */ - - /** Keytab file key name to be used in flink configuration file */ - public static final String SECURITY_KEYTAB_KEY = "security.keytab"; - - /** Kerberos security principal key name to be used in flink configuration file */ - public static final String SECURITY_PRINCIPAL_KEY = "security.principal"; - - /** * Not instantiable. */ http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 1ee988a..4792eba 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -124,14 +124,6 @@ public class HighAvailabilityOptions { .defaultValue(3) .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts"); - public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = - key("zookeeper.sasl.disable") - .defaultValue(true); - - public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = - key("zookeeper.sasl.service-name") - .noDefaultValue(); - // ------------------------------------------------------------------------ /** Not intended to be instantiated */ http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java new file mode 100644 index 0000000..67d101d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to security. + */ +public class SecurityOptions { + + // ------------------------------------------------------------------------ + // Kerberos Options + // ------------------------------------------------------------------------ + + public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL = + key("security.kerberos.login.principal") + .noDefaultValue() + .withDeprecatedKeys("security.principal"); + + public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB = + key("security.kerberos.login.keytab") + .noDefaultValue() + .withDeprecatedKeys("security.keytab"); + + public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE = + key("security.kerberos.login.use-ticket-cache") + .defaultValue(true); + + public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS = + key("security.kerberos.login.contexts") + .noDefaultValue(); + + + // ------------------------------------------------------------------------ + // ZooKeeper Security Options + // ------------------------------------------------------------------------ + + public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = + key("zookeeper.sasl.service-name") + .defaultValue("zookeeper"); + + public static final ConfigOption<String> ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME = + key("zookeeper.sasl.login-context-name") + .defaultValue("Client"); +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-dist/src/main/resources/flink-conf.yaml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index c650cfe..f759db6 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -162,21 +162,24 @@ jobmanager.web.port: 8081 # Flink Cluster Security Configuration (optional configuration) #============================================================================== -# Kerberos security for the connectors can be enabled by providing below configurations -# Security works in two modes - keytab/principal combination or using the Kerberos token cache -# If keytab and principal are not provided, token cache (manual kinit) will be used +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL -#security.keytab: /path/to/kerberos/keytab -#security.principal: flink-user +#security.kerberos.login.keytab: /path/to/kerberos/keytab +#security.kerberos.login.principal: flink-user +#security.kerberos.login.use-ticket-cache: true + +#security.kerberos.login.contexts: Client,KafkaClient #============================================================================== # ZK Security Configuration (optional configuration) #============================================================================== -# Below configurations are applicable if ZK quorum is configured for Kerberos security -# SASL authentication is disabled by default and can be enabled by changig the value to false -# -# zookeeper.sasl.disable: true +# Below configurations are applicable if ZK ensemble is configured for security # Override below configuration to provide custom ZK service name if configured # http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java index 7c41eaf..da8244f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils; import java.io.File; import java.lang.reflect.Constructor; +import java.util.Collection; import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +48,8 @@ public final class HadoopUtils { private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + private static final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + /** * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. */ @@ -163,6 +170,20 @@ public final class HadoopUtils { } /** + * Indicates whether the current user has an HDFS delegation token. + */ + public static boolean hasHDFSDelegationToken() throws Exception { + UserGroupInformation loginUser = UserGroupInformation.getCurrentUser(); + Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); + for (Token<? extends TokenIdentifier> token : usrTok) { + if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) { + return true; + } + } + return false; + } + + /** * Private constructor to prevent instantiation. */ private HadoopUtils() { http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 4b9bd82..689c26a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.curator.framework.CuratorFramework; -import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner { // configure security SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config); - sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); SecurityUtils.install(sc); // run the actual work in the installed security context http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index 75b5043..206c71b 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -118,7 +117,6 @@ public class MesosTaskManagerRunner { // Run the TM in the security context SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); - sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); SecurityUtils.install(sc); try { http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java index 7fe5b3e..271b32d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.clusterframework.overlays; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.slf4j.Logger; @@ -34,7 +34,7 @@ import java.io.IOException; * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container. * * The folloowing Flink configuration entries are updated: - * - security.keytab + * - security.kerberos.login.keytab */ public class KeytabOverlay extends AbstractContainerOverlay { @@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay { .setDest(TARGET_PATH) .setCachable(false) .build()); - container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath()); + container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, TARGET_PATH.getPath()); } } @@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay { } /** - * A builder for the {@link HadoopUserOverlay}. + * A builder for the {@link KeytabOverlay}. */ public static class Builder { @@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay { * Configures the overlay using the current environment (and global configuration). * * The following Flink configuration settings are checked for a keytab: - * - security.keytab + * - security.kerberos.login.keytab */ public Builder fromEnvironment(Configuration globalConfiguration) { - String keytab = globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); + String keytab = globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); if(keytab != null) { keytabPath = new File(keytab); if(!keytabPath.exists()) { throw new IllegalStateException("Invalid configuration for " + - ConfigConstants.SECURITY_KEYTAB_KEY + + SecurityOptions.KERBEROS_LOGIN_KEYTAB + "; '" + keytab + "' not found."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java new file mode 100644 index 0000000..6af4f23 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + + +/** + * A dynamic JAAS configuration. + * + * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon + * an (optional) underlying configuration. Entries from the underlying configuration take + * precedence over dynamic entries. + */ +public class DynamicConfiguration extends Configuration { + + protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class); + + private final Configuration delegate; + + private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>(); + + /** + * Create a dynamic configuration. + * @param delegate an underlying configuration to delegate to, or null. + */ + public DynamicConfiguration(@Nullable Configuration delegate) { + this.delegate = delegate; + } + + /** + * Add entries for the given application name. + */ + public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) { + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + final AppConfigurationEntry[] updated; + if(existing == null) { + updated = Arrays.copyOf(entry, entry.length); + } + else { + updated = merge(existing, entry); + } + dynamicEntries.put(name, updated); + } + + /** + * Retrieve the AppConfigurationEntries for the specified <i>name</i> + * from this Configuration. + * + * <p> + * + * @param name the name used to index the Configuration. + * + * @return an array of AppConfigurationEntries for the specified <i>name</i> + * from this Configuration, or null if there are no entries + * for the specified <i>name</i> + */ + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + AppConfigurationEntry[] entry = null; + if(delegate != null) { + entry = delegate.getAppConfigurationEntry(name); + } + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + if(existing != null) { + if(entry != null) { + entry = merge(entry, existing); + } + else { + entry = Arrays.copyOf(existing, existing.length); + } + } + return entry; + } + + private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) { + AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length); + System.arraycopy(b, 0, merged, a.length, b.length); + return merged; + } + + @Override + public void refresh() { + if(delegate != null) { + delegate.refresh(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java deleted file mode 100644 index c4527dd..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.security; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.annotation.Internal; -import org.apache.hadoop.security.authentication.util.KerberosUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; -import java.io.File; -import java.util.HashMap; -import java.util.Map; - -/** - * - * JAAS configuration provider object that provides default LoginModule for various connectors that supports - * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. - * - * Different connectors uses different login module name to implement JAAS based authentication support. - * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the - * name to be "client". This sets responsibility on the Flink cluster administrator to configure/provide right - * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides - * a standard lookup to get the login module entry for the JAAS based authentication to work. - * - * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. - * - * <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html">Configuration</a> - * - */ - -@Internal -public class JaasConfiguration extends Configuration { - - private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); - - public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); - - public static final boolean IBM_JAVA; - - private static final Map<String, String> debugOptions = new HashMap<>(); - - private static final Map<String, String> kerberosCacheOptions = new HashMap<>(); - - private static final Map<String, String> keytabKerberosOptions = new HashMap<>(); - - private static final AppConfigurationEntry userKerberosAce; - - private AppConfigurationEntry keytabKerberosAce = null; - - static { - - IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); - - if(LOG.isDebugEnabled()) { - debugOptions.put("debug", "true"); - } - - if(IBM_JAVA) { - kerberosCacheOptions.put("useDefaultCcache", "true"); - } else { - kerberosCacheOptions.put("doNotPrompt", "true"); - kerberosCacheOptions.put("useTicketCache", "true"); - } - - String ticketCache = System.getenv("KRB5CCNAME"); - if(ticketCache != null) { - if(IBM_JAVA) { - System.setProperty("KRB5CCNAME", ticketCache); - } else { - kerberosCacheOptions.put("ticketCache", ticketCache); - } - } - - kerberosCacheOptions.put("renewTGT", "true"); - kerberosCacheOptions.putAll(debugOptions); - - userKerberosAce = new AppConfigurationEntry( - KerberosUtil.getKrb5LoginModuleName(), - AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, - kerberosCacheOptions); - - } - - protected JaasConfiguration(String keytab, String principal) { - - LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); - - if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) || - (!StringUtils.isBlank(keytab) && StringUtils.isBlank(principal))){ - throw new RuntimeException("Both keytab and principal are required and cannot be empty"); - } - - if(!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) { - - if(IBM_JAVA) { - keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); - keytabKerberosOptions.put("credsType", "both"); - } else { - keytabKerberosOptions.put("keyTab", keytab); - keytabKerberosOptions.put("doNotPrompt", "true"); - keytabKerberosOptions.put("useKeyTab", "true"); - keytabKerberosOptions.put("storeKey", "true"); - } - - keytabKerberosOptions.put("principal", principal); - keytabKerberosOptions.put("refreshKrb5Config", "true"); - keytabKerberosOptions.putAll(debugOptions); - - keytabKerberosAce = new AppConfigurationEntry( - KerberosUtil.getKrb5LoginModuleName(), - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, - keytabKerberosOptions); - } - } - - public static Map<String, String> getKeytabKerberosOptions() { - return keytabKerberosOptions; - } - - private static String prependFileUri(String keytabPath) { - File f = new File(keytabPath); - return f.toURI().toString(); - } - - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) { - - LOG.debug("JAAS configuration requested for the application entry: {}", applicationName); - - AppConfigurationEntry[] appConfigurationEntry; - - if(keytabKerberosAce != null) { - appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce}; - } else { - appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce}; - } - - return appConfigurationEntry; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java new file mode 100644 index 0000000..7ef9187 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * + * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances. + * + * The implementation is inspired from Hadoop UGI class. + */ +@Internal +public class KerberosUtils { + + private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class); + + private static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + private static final boolean IBM_JAVA; + + private static final Map<String, String> debugOptions = new HashMap<>(); + + private static final Map<String, String> kerberosCacheOptions = new HashMap<>(); + + private static final AppConfigurationEntry userKerberosAce; + + static { + + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + + if(LOG.isDebugEnabled()) { + debugOptions.put("debug", "true"); + } + + if(IBM_JAVA) { + kerberosCacheOptions.put("useDefaultCcache", "true"); + } else { + kerberosCacheOptions.put("doNotPrompt", "true"); + kerberosCacheOptions.put("useTicketCache", "true"); + } + + String ticketCache = System.getenv("KRB5CCNAME"); + if(ticketCache != null) { + if(IBM_JAVA) { + System.setProperty("KRB5CCNAME", ticketCache); + } else { + kerberosCacheOptions.put("ticketCache", ticketCache); + } + } + + kerberosCacheOptions.put("renewTGT", "true"); + kerberosCacheOptions.putAll(debugOptions); + + userKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL, + kerberosCacheOptions); + + } + + public static AppConfigurationEntry ticketCacheEntry() { + return userKerberosAce; + } + + public static AppConfigurationEntry keytabEntry(String keytab, String principal) { + + checkNotNull(keytab, "keytab"); + checkNotNull(principal, "principal"); + + Map<String, String> keytabKerberosOptions = new HashMap<>(); + + if(IBM_JAVA) { + keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); + keytabKerberosOptions.put("credsType", "both"); + } else { + keytabKerberosOptions.put("keyTab", keytab); + keytabKerberosOptions.put("doNotPrompt", "true"); + keytabKerberosOptions.put("useKeyTab", "true"); + keytabKerberosOptions.put("storeKey", "true"); + } + + keytabKerberosOptions.put("principal", principal); + keytabKerberosOptions.put("refreshKrb5Config", "true"); + keytabKerberosOptions.putAll(debugOptions); + + AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + keytabKerberosOptions); + + return keytabKerberosAce; + } + + private static String prependFileUri(String keytabPath) { + File f = new File(keytabPath); + return f.toURI().toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java index d7fc6ff..d76e7a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java @@ -18,216 +18,162 @@ package org.apache.flink.runtime.security; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.Credentials; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.security.modules.HadoopModule; +import org.apache.flink.runtime.security.modules.JaasModule; +import org.apache.flink.runtime.security.modules.SecurityModule; +import org.apache.flink.runtime.security.modules.ZooKeeperModule; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.Subject; import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Method; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.Collection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; /* - * Utils for configuring security. The following security mechanism are supported: + * Utils for configuring security. The following security subsystems are supported: * * 1. Java Authentication and Authorization Service (JAAS) * 2. Hadoop's User Group Information (UGI) + * 3. ZooKeeper's process-wide security settings. */ public class SecurityUtils { private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class); - public static final String JAAS_CONF_FILENAME = "flink-jaas.conf"; - - public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; - - private static final String ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; - - private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; - private static SecurityContext installedContext = new NoOpSecurityContext(); + private static List<SecurityModule> installedModules = null; + public static SecurityContext getInstalledContext() { return installedContext; } + @VisibleForTesting + static List<SecurityModule> getInstalledModules() { + return installedModules; + } + /** - * Performs a static initialization of the JAAS and Hadoop UGI security mechanism. - * It creates the in-memory JAAS configuration object which will serve appropriate - * ApplicationConfigurationEntry for the connector login module implementation that - * authenticates Kerberos identity using SASL/JAAS based mechanism. + * Installs a process-wide security configuration. + * + * Applies the configuration using the available security modules (i.e. Hadoop, JAAS). */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; + // install the security modules + List<SecurityModule> modules = new ArrayList<>(); + try { + for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) { + SecurityModule module = moduleClass.newInstance(); + module.install(config); + modules.add(module); + } } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* - * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are - * used in the context of reading the stored tokens from UGI. - * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); - * loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } + + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + module.uninstall(); } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); - } - - LOG.info("Hadoop user set to {}", loginUser.toString()); - - boolean delegationToken = false; - final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); - Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); - for (Token<? extends TokenIdentifier> token : usrTok) { - final Text id = new Text(token.getIdentifier()); - LOG.debug("Found user token " + id + " with " + token); - if (token.getKind().equals(HDFS_DELEGATION_KIND)) { - delegationToken = true; + catch(UnsupportedOperationException ignored) { } - } - - if (!loginUser.hasKerberosCredentials()) { - //throw an error in non-yarn deployment if kerberos cache is not available - if (!delegationToken) { - LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); - throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(SecurityModule.SecurityInstallException e) { + LOG.warn("unable to uninstall a security module", e); } } - - if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - } - - installedContext = new HadoopSecurityContext(loginUser); + installedModules = null; } - } - static void clearContext() { installedContext = new NoOpSecurityContext(); } - /* - * This method configures some of the system properties that are require for ZK and Kafka SASL authentication - * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 - * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 - * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and - * Kafka current code behavior. + /** + * The global security configuration. + * + * See {@link SecurityOptions} for corresponding configuration options. */ - private static void populateSystemSecurityProperties(Configuration configuration) { - Preconditions.checkNotNull(configuration, "The supplied configuration was null"); + public static class SecurityConfiguration { - boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE); + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList( + Arrays.asList(HadoopModule.class, JaasModule.class, ZooKeeperModule.class)); - if (disableSaslClient) { - LOG.info("SASL client auth for ZK will be disabled"); - //SASL auth is disabled by default but will be enabled if specified in configuration - System.setProperty(ZOOKEEPER_SASL_CLIENT,"false"); - return; - } + private final List<Class<? extends SecurityModule>> securityModules; - // load Jaas config file to initialize SASL - final File jaasConfFile; - try { - Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, ""); - InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME); - Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); - jaasConfFile = jaasConfPath.toFile(); - jaasConfFile.deleteOnExit(); - jaasConfStream.close(); - } catch (IOException e) { - throw new RuntimeException("SASL auth is enabled for ZK but unable to " + - "locate pseudo Jaas config provided with Flink", e); - } + private final org.apache.hadoop.conf.Configuration hadoopConf; - LOG.info("Enabling {} property with pseudo JAAS config file: {}", - JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); + private final boolean useTicketCache; - //ZK client module lookup the configuration to handle SASL. - //https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 - System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); - System.setProperty(ZOOKEEPER_SASL_CLIENT, "true"); + private final String keytab; - String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME); - if (!StringUtils.isBlank(zkSaslServiceName)) { - LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName); - System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName); - } + private final String principal; - } + private final List<String> loginContextNames; - /** - * Inputs for establishing the security context. - */ - public static class SecurityConfiguration { + private final String zkServiceName; - private Configuration flinkConf; + private final String zkLoginContextName; - private org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + /** + * Create a security configuration from the global configuration. + * @param flinkConf the Flink global configuration. + */ + public SecurityConfiguration(Configuration flinkConf) { + this(flinkConf, HadoopUtils.getHadoopConfiguration()); + } - private String keytab; + /** + * Create a security configuration from the global configuration. + * @param flinkConf the Flink global configuration. + * @param hadoopConf the Hadoop configuration. + */ + public SecurityConfiguration(Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) { + this(flinkConf, hadoopConf, DEFAULT_MODULES); + } - private String principal; + /** + * Create a security configuration from the global configuration. + * @param flinkConf the Flink global configuration. + * @param hadoopConf the Hadoop configuration. + * @param securityModules the security modules to apply. + */ + public SecurityConfiguration(Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + List<? extends Class<? extends SecurityModule>> securityModules) { + this.hadoopConf = checkNotNull(hadoopConf); + this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); + this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); + this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); + this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS)); + this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME); + this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME); + this.securityModules = Collections.unmodifiableList(securityModules); + + validate(); + } public String getKeytab() { return keytab; @@ -237,48 +183,50 @@ public class SecurityUtils { return principal; } - public SecurityConfiguration(Configuration flinkConf) { - this.flinkConf = flinkConf; + public boolean useTicketCache() { + return useTicketCache; + } - String keytab = flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); - String principal = flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null); - validate(keytab, principal); + public org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + return hadoopConf; + } - this.keytab = keytab; - this.principal = principal; + public List<Class<? extends SecurityModule>> getSecurityModules() { + return securityModules; } - public SecurityConfiguration setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) { - this.hadoopConf = conf; - return this; + public List<String> getLoginContextNames() { + return loginContextNames; } - private void validate(String keytab, String principal) { - LOG.debug("keytab {} and principal {} .", keytab, principal); + public String getZooKeeperServiceName() { + return zkServiceName; + } - if(StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal) || - !StringUtils.isBlank(keytab) && StringUtils.isBlank(principal)) { - if(StringUtils.isBlank(keytab)) { - LOG.warn("Keytab is null or empty"); - } + public String getZooKeeperLoginContextName() { + return zkLoginContextName; + } + + private void validate() { + if(!StringUtils.isBlank(keytab)) { + // principal is required if(StringUtils.isBlank(principal)) { - LOG.warn("Principal is null or empty"); + throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal."); } - throw new RuntimeException("Requires both keytab and principal to be provided"); - } - if(!StringUtils.isBlank(keytab)) { + // check the keytab is readable File keytabFile = new File(keytab); - if(!keytabFile.exists() || !keytabFile.isFile()) { - LOG.warn("Not a valid keytab: {} file", keytab); - throw new RuntimeException("Invalid keytab file: " + keytab + " passed"); + if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) { + throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable"); } } - } - public boolean securityIsEnabled() { - return keytab != null && principal != null; + private static List<String> parseList(String value) { + if(value == null) { + return Collections.emptyList(); + } + return Arrays.asList(value.split(",")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java new file mode 100644 index 0000000..9344faf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Responsible for installing a Hadoop login user. + */ +public class HadoopModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class); + + UserGroupInformation loginUser; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException { + + UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration()); + + try { + if (UserGroupInformation.isSecurityEnabled() && + !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) { + String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath(); + + UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath); + + loginUser = UserGroupInformation.getLoginUser(); + + // supplement with any available tokens + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (fileLocation != null) { + /* + * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are + * used in the context of reading the stored tokens from UGI. + * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); + * loginUser.addCredentials(cred); + */ + try { + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + securityConfig.getHadoopConfiguration()); + Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", + Credentials.class); + addCredentialsMethod.invoke(loginUser, cred); + } catch (NoSuchMethodException e) { + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + } else { + // login with current user credentials (e.g. ticket cache, OS login) + // note that the stored tokens are read automatically + try { + //Use reflection API to get the login user object + //UserGroupInformation.loginUserFromSubject(null); + Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); + loginUserFromSubjectMethod.invoke(null, (Subject) null); + } catch (NoSuchMethodException e) { + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + + loginUser = UserGroupInformation.getLoginUser(); + } + + if (UserGroupInformation.isSecurityEnabled()) { + // note: UGI::hasKerberosCredentials inaccurately reports false + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), + // so we check only in ticket cache scenario. + if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) { + // a delegation token is an adequate substitute in most cases + if (!HadoopUtils.hasHDFSDelegationToken()) { + LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials"); + } + } + } + + LOG.info("Hadoop user set to {}", loginUser); + + } catch (Throwable ex) { + throw new SecurityInstallException("Unable to set the Hadoop login user", ex); + } + } + + @Override + public void uninstall() throws SecurityInstallException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java new file mode 100644 index 0000000..f8b9bdf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Responsible for installing a process-wide JAAS configuration. + * <p> + * The installed configuration combines login modules based on: + * - the user-supplied JAAS configuration file, if any + * - a Kerberos keytab, if configured + * - any cached Kerberos credentials from the current environment + * <p> + * The module also installs a default JAAS config file (if necessary) for + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations. + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + */ +@Internal +public class JaasModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class); + + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf"; + + private String priorConfigFile; + private javax.security.auth.login.Configuration priorConfig; + + private DynamicConfiguration currentConfig; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) throws SecurityInstallException { + + // ensure that a config file is always defined, for compatibility with + // ZK and Kafka which check for the system property and existence of the file + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null); + if (priorConfigFile == null) { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + } + + // read the JAAS configuration file + priorConfig = javax.security.auth.login.Configuration.getConfiguration(); + + // construct a dynamic JAAS configuration + currentConfig = new DynamicConfiguration(priorConfig); + + // wire up the configured JAAS login contexts to use the krb5 entries + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig); + if(krb5Entries != null) { + for (String app : securityConfig.getLoginContextNames()) { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + } + } + + javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + + @Override + public void uninstall() throws SecurityInstallException { + if(priorConfigFile != null) { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + } else { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + } + javax.security.auth.login.Configuration.setConfiguration(priorConfig); + } + + public DynamicConfiguration getCurrentConfiguration() { + return currentConfig; + } + + private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) { + + AppConfigurationEntry userKerberosAce = null; + if (securityConfig.useTicketCache()) { + userKerberosAce = KerberosUtils.ticketCacheEntry(); + } + AppConfigurationEntry keytabKerberosAce = null; + if (securityConfig.getKeytab() != null) { + keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal()); + } + + AppConfigurationEntry[] appConfigurationEntry; + if (userKerberosAce != null && keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce}; + } else if (keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[]{keytabKerberosAce}; + } else if (userKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[]{userKerberosAce}; + } else { + return null; + } + + return appConfigurationEntry; + } + + /** + * Generate the default JAAS config file. + */ + private static File generateDefaultConfigFile() { + final File jaasConfFile; + try { + Path jaasConfPath = Files.createTempFile("jaas-", ".conf"); + try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) { + Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + } + jaasConfFile = jaasConfPath.toFile(); + jaasConfFile.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("unable to generate a JAAS configuration file", e); + } + return jaasConfFile; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java new file mode 100644 index 0000000..fbe1db9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.runtime.security.SecurityUtils; + +import java.security.GeneralSecurityException; + +/** + * An installable security module. + */ +public interface SecurityModule { + + /** + * Install the security module. + * + * @param configuration the security configuration. + * @throws SecurityInstallException if the security module couldn't be installed. + */ + void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException; + + /** + * Uninstall the security module. + * + * @throws SecurityInstallException if the security module couldn't be uninstalled. + * @throws UnsupportedOperationException if the security module doesn't support uninstallation. + */ + void uninstall() throws SecurityInstallException; + + /** + * Indicates a problem with installing or uninstalling a security module. + */ + class SecurityInstallException extends GeneralSecurityException { + private static final long serialVersionUID = 1L; + + public SecurityInstallException(String msg) { + super(msg); + } + + public SecurityInstallException(String msg, Throwable cause) { + super(msg, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java new file mode 100644 index 0000000..c0ba4a5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.runtime.security.SecurityUtils; + +/** + * Responsible for installing a process-wide ZooKeeper security configuration. + */ +public class ZooKeeperModule implements SecurityModule { + + private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; + + /** + * A system property for setting whether ZK uses SASL. + */ + private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client"; + + /** + * A system property for setting the expected ZooKeeper service name. + */ + private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; + + /** + * A system property for setting the login context name to use. + */ + private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig"; + + private String priorServiceName; + + private String priorLoginContextName; + + @Override + public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException { + + priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null); + if (!"zookeeper".equals(configuration.getZooKeeperServiceName())) { + System.setProperty(ZK_SASL_CLIENT_USERNAME, configuration.getZooKeeperServiceName()); + } + + priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null); + if (!"Client".equals(configuration.getZooKeeperLoginContextName())) { + System.setProperty(ZK_LOGIN_CONTEXT_NAME, configuration.getZooKeeperLoginContextName()); + } + } + + @Override + public void uninstall() throws SecurityInstallException { + if(priorServiceName != null) { + System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName); + } else { + System.clearProperty(ZK_SASL_CLIENT_USERNAME); + } + if(priorLoginContextName != null) { + System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName); + } else { + System.clearProperty(ZK_LOGIN_CONTEXT_NAME); + } + } + +}
