[FLINK-5364] [security] Rework JAAS configuration to support user-supplied 
entries

Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

This closes #3057


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00193f7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00193f7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00193f7e

Branch: refs/heads/release-1.2
Commit: 00193f7e238340cc18c57a44c7e6377432839373
Parents: 1750b0d
Author: wrighe3 <[email protected]>
Authored: Tue Dec 20 01:07:38 2016 -0800
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jan 11 19:06:10 2017 +0100

----------------------------------------------------------------------
 docs/internals/flink_security.md                | 133 +++++---
 docs/setup/config.md                            |  55 +++-
 .../connectors/fs/RollingSinkSecuredITCase.java |   8 +-
 .../flink/configuration/ConfigConstants.java    |  14 -
 .../configuration/HighAvailabilityOptions.java  |   8 -
 .../flink/configuration/SecurityOptions.java    |  62 ++++
 flink-dist/src/main/resources/flink-conf.yaml   |  21 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  21 ++
 .../MesosApplicationMasterRunner.java           |   2 -
 .../MesosTaskManagerRunner.java                 |   2 -
 .../overlays/KeytabOverlay.java                 |  14 +-
 .../runtime/security/DynamicConfiguration.java  | 111 +++++++
 .../runtime/security/JaasConfiguration.java     | 160 ---------
 .../flink/runtime/security/KerberosUtils.java   | 125 +++++++
 .../flink/runtime/security/SecurityUtils.java   | 322 ++++++++-----------
 .../runtime/security/modules/HadoopModule.java  | 119 +++++++
 .../runtime/security/modules/JaasModule.java    | 146 +++++++++
 .../security/modules/SecurityModule.java        |  59 ++++
 .../security/modules/ZooKeeperModule.java       |  76 +++++
 .../src/main/resources/flink-jaas.conf          |   9 +-
 .../overlays/KeytabOverlayTest.java             |   5 +-
 .../runtime/security/JaasConfigurationTest.java |  52 ---
 .../runtime/security/KerberosUtilsTest.java     |  48 +++
 .../runtime/security/SecurityUtilsTest.java     | 105 +++---
 .../flink/test/util/SecureTestEnvironment.java  |  48 +--
 .../test/util/TestingJaasConfiguration.java     | 106 ------
 .../flink/test/util/TestingSecurityContext.java |  38 +--
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  10 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  26 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  25 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  21 +-
 32 files changed, 1182 insertions(+), 774 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
index 846273b..a83f3b9 100644
--- a/docs/internals/flink_security.md
+++ b/docs/internals/flink_security.md
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of 
various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This 
documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of 
various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. 
Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access for 
jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time 
(days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation 
supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during 
deployment time. Any jobs submitted will continue to run in the identity of the 
cluster.
+In a production deployment scenario, streaming jobs are understood to run for 
long periods of time (days/weeks/months) and be able to authenticate to secure 
+data sources throughout the life of the job.  Kerberos keytabs do not expire 
in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task 
Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the 
credential configured for a given cluster.   To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different 
configuration.   Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
 
 ## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web 
UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, 
Kinesis etc.,) and each connector may have a specific security 
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While 
satisfying the security requirements for all the connectors evolves over a 
period 
-of time, at this time of writing, the following connectors/services are tested 
for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, 
HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication 
methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the 
security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The 
following services and connectors are tested for Kerberos authentication:
 
-- Kafka (0.9)
+- Kafka (0.9+)
 - HDFS
+- HBase
 - ZooKeeper
 
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a 
static implementation that takes care of handling Kerberos authentication. The 
Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security 
credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each 
service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or 
vice versa.    The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing 
`org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a 
process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have 
whatever Kerberos credential is configured.  Otherwise,
+the login user conveys only the user identity of the OS account that launched 
the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making 
available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the 
mechanisms described in the [Java SE 
Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html).
   Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related 
settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, 
based on the following configuration option:
 
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism 
to authenticate against a Kerberos server. It expects JAAS configuration with a 
platform-specific login 
-module *name* to be provided. Managing per-connector configuration files will 
be an overhead and to overcome this requirement, a process-wide JAAS 
configuration object is 
-instantiated which serves standard ApplicationConfigurationEntry for the 
connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from 
the user's Kerberos ticket cache (default: `true`).
 
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself 
uses UGI's doAS() implementation to run under a specific user context, i.e. if 
Hadoop security is enabled 
-then the Flink processes will be running under a secure user account or else 
it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to 
the Flink configuration file:
 
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file 
that contains the user credentials.
 
-Secure credentials can be supplied by adding below configuration elements to 
Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with 
the keytab.
 
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the 
user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in 
a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop 
context is based on the Hadoop configuration (see next section).   To be used 
in a JAAS context, the configuration specifies which JAAS *login contexts* (or 
*applications*) are enabled with the following configuration option:
 
-- `security.principal`: User principal name that the Flink cluster should run 
as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts 
to provide the Kerberos credentials to (for example, `Client` to use the 
credentials for ZooKeeper authentication).
 
-The delegation token mechanism (*kinit cache*) is still supported for backward 
compatibility but enabling security using *keytab* configuration is the 
preferred and recommended approach.
+ZooKeeper-related configuration overrides:
 
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper 
cluster is configured to use (default: `zookeeper`). Facilitates 
mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the 
ZooKeeper client uses to request the login context (default: `Client`). Should 
match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment 
variable and by other means (see 
`org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos 
credential (configured above) is used automatically if Hadoop security is 
enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable 
to other hosts.   In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
 
 Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster 
nodes) 
-- Make sure the Keytab file exist in the path as indicated in 
*security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file 
(on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by 
`security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file 
on the client.
+2. Ensure that the keytab file exists at the path as indicated by 
`security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the 
Flink containers.
 
-## Yarn Mode:
+For more information, see <a 
href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md";>YARN
 security</a> documentation.
 
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from 
where cluster will be provisioned using Flink/Yarn CLI) 
-- Make sure the Keytab file exist in the path as indicated in 
*security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
 
-In Yarn mode, the user supplied keytab will be copied over to the Yarn 
containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a 
href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md";>Yarn
 security</a> 
+In YARN mode, it is possible to deploy a secure Flink cluster without a 
keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the 
cluster manager with it.  The main drawback is
+that the cluster is necessarily short-lived since the generated delegation 
tokens will expire (typically within a week).
 
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file 
on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
 
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the 
tickets upon reaching expiry and no further action is needed on the part of 
Flink.
\ No newline at end of file
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing 
the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a 
keytab.  In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 512c08a..3ef4ffd 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application 
for memory and garbag
 
 - `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in 
which the TaskManagers log the memory and garbage collection statistics. Only 
has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
 
-### Kerberos
+### Kerberos-based Security
 
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
 
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
 + Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
 
-Hadoop components relies on the UserGroupInformation (UGI) implementation to 
handle Kerberos authentication, whereas Kafka and Zookeeper services handles 
Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
-**Ticket cache** and **Keytab** modes are supported for all above mentioned 
services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket 
via `kinit`)
+2. Making the Kerberos credential available to components and connectors as 
needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login 
keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket 
expiration issues.   If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token 
lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from 
your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file 
that contains the user credentials.
 
-> Ticket cache (Supported only to provide backward compatibility support. 
Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with 
the keytab.
 
-While Hadoop uses Kerberos tickets to authenticate users with services 
initially, the authentication process continues differently afterwards. Instead 
of saving the ticket to authenticate on a later access, Hadoop creates its own 
security tokens (DelegationToken) that it passes around. These are 
authenticated to Kerberos periodically but are independent of the token renewal 
time. The tokens have a maximum life span identical to the Kerberos ticket 
maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically 
use the configured Kerberos credentials when connecting to HDFS, HBase, and 
other Hadoop components.
 
-While using ticket cache mode, please make sure to set the maximum ticket life 
span high long running jobs.
+Make the Kerberos credentials available to any connector or component that 
uses a JAAS configuration file by configuring JAAS login contexts.
 
-If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts 
to provide the Kerberos credentials to (for example, `Client,KafkaClient` to 
use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override 
those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to 
use Kerberos authentication.  For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
 
-> Keytab (security principal and keytab can be configured through Flink 
configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
 
-Kerberos ticket renewal is abstracted and automatically handled by the 
Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and 
you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper 
cluster is configured to use (default: `zookeeper`).
 
-For Kafka and ZK, process-wide JAAS config will be created using the provided 
security credentials and the Kerberos authentication will be handled by 
Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the 
ZooKeeper client uses to request the login context (default: `Client`). Should 
match
+one of the values specified in `security.kerberos.login.contexts`.
 
 ### Other
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index eb12d07..fa46fc7 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                populateSecureConfigurations();
 
                Configuration flinkConfig = new Configuration();
-               flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+               flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
                                SecureTestEnvironment.getTestKeytab());
-               flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+               flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
                                
SecureTestEnvironment.getHadoopServicePrincipal());
 
-               SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
-               ctx.setHadoopConfiguration(conf);
+               SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig, conf);
                try {
                        TestingSecurityContext.install(ctx, 
SecureTestEnvironment.getClientSecurityConfigurationMap());
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index eabb754..fc389e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1395,20 +1395,6 @@ public final class ConfigConstants {
        /** The environment variable name which contains the Flink installation 
root directory */
        public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
-       // -------------------------------- Security 
-------------------------------
-
-       /**
-        * The config parameter defining security credentials required
-        * for securing Flink cluster.
-        */
-
-       /** Keytab file key name to be used in flink configuration file */
-       public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
-       /** Kerberos security principal key name to be used in flink 
configuration file */
-       public static final String SECURITY_PRINCIPAL_KEY = 
"security.principal";
-
-
        /**
         * Not instantiable.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 1ee988a..4792eba 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -124,14 +124,6 @@ public class HighAvailabilityOptions {
                        .defaultValue(3)
                        
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
 
-       public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = 
-                       key("zookeeper.sasl.disable")
-                       .defaultValue(true);
-
-       public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = 
-                       key("zookeeper.sasl.service-name")
-                       .noDefaultValue();
-
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
new file mode 100644
index 0000000..67d101d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to security.
+ */
+public class SecurityOptions {
+
+       // 
------------------------------------------------------------------------
+       //  Kerberos Options
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+               key("security.kerberos.login.principal")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("security.principal");
+
+       public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+               key("security.kerberos.login.keytab")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("security.keytab");
+
+       public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE 
=
+               key("security.kerberos.login.use-ticket-cache")
+                       .defaultValue(true);
+
+       public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
+               key("security.kerberos.login.contexts")
+                       .noDefaultValue();
+
+
+       // 
------------------------------------------------------------------------
+       //  ZooKeeper Security Options
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
+               key("zookeeper.sasl.service-name")
+                       .defaultValue("zookeeper");
+
+       public static final ConfigOption<String> 
ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME =
+               key("zookeeper.sasl.login-context-name")
+                       .defaultValue("Client");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index c650cfe..f759db6 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -162,21 +162,24 @@ jobmanager.web.port: 8081
 # Flink Cluster Security Configuration (optional configuration)
 #==============================================================================
 
-# Kerberos security for the connectors can be enabled by providing below 
configurations
-# Security works in two modes - keytab/principal combination or using the 
Kerberos token cache
-# If keytab and principal are not provided, token cache (manual kinit) will be 
used
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
 
-#security.keytab: /path/to/kerberos/keytab
-#security.principal: flink-user
+#security.kerberos.login.keytab: /path/to/kerberos/keytab
+#security.kerberos.login.principal: flink-user
+#security.kerberos.login.use-ticket-cache: true
+
+#security.kerberos.login.contexts: Client,KafkaClient
 
 #==============================================================================
 # ZK Security Configuration (optional configuration)
 #==============================================================================
-# Below configurations are applicable if ZK quorum is configured for Kerberos 
security
 
-# SASL authentication is disabled by default and can be enabled by changig the 
value to false
-#
-# zookeeper.sasl.disable: true
+# Below configurations are applicable if ZK ensemble is configured for security
 
 # Override below configuration to provide custom ZK service name if configured
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 7c41eaf..da8244f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -21,17 +21,22 @@ package org.apache.flink.api.java.hadoop.mapred.utils;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +48,8 @@ public final class HadoopUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(HadoopUtils.class);
 
+       private static final Text HDFS_DELEGATION_TOKEN_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
+
        /**
         * Merge HadoopConfiguration into JobConf. This is necessary for the 
HDFS configuration.
         */
@@ -163,6 +170,20 @@ public final class HadoopUtils {
        }
 
        /**
+        * Indicates whether the current user has an HDFS delegation token.
+        */
+       public static boolean hasHDFSDelegationToken() throws Exception {
+               UserGroupInformation loginUser = 
UserGroupInformation.getCurrentUser();
+               Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();
+               for (Token<? extends TokenIdentifier> token : usrTok) {
+                       if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) 
{
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private HadoopUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 4b9bd82..689c26a 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -27,7 +27,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -168,7 +167,6 @@ public class MesosApplicationMasterRunner {
 
                        // configure security
                        SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(config);
-                       
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
                        SecurityUtils.install(sc);
 
                        // run the actual work in the installed security context

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 75b5043..206c71b 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -118,7 +117,6 @@ public class MesosTaskManagerRunner {
 
                // Run the TM in the security context
                SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
-               sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
                SecurityUtils.install(sc);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
index 7fe5b3e..271b32d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.overlays;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.slf4j.Logger;
@@ -34,7 +34,7 @@ import java.io.IOException;
  * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container.
  *
  * The folloowing Flink configuration entries are updated:
- *  - security.keytab
+ *  - security.kerberos.login.keytab
  */
 public class KeytabOverlay extends AbstractContainerOverlay {
 
@@ -60,7 +60,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
                                .setDest(TARGET_PATH)
                                .setCachable(false)
                                .build());
-                       
container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY,
 TARGET_PATH.getPath());
+                       
container.getDynamicConfiguration().setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
 TARGET_PATH.getPath());
                }
        }
 
@@ -69,7 +69,7 @@ public class KeytabOverlay extends AbstractContainerOverlay {
        }
 
        /**
-        * A builder for the {@link HadoopUserOverlay}.
+        * A builder for the {@link KeytabOverlay}.
         */
        public static class Builder {
 
@@ -79,15 +79,15 @@ public class KeytabOverlay extends AbstractContainerOverlay 
{
                 * Configures the overlay using the current environment (and 
global configuration).
                 *
                 * The following Flink configuration settings are checked for a 
keytab:
-                *  - security.keytab
+                *  - security.kerberos.login.keytab
                 */
                public Builder fromEnvironment(Configuration 
globalConfiguration) {
-                       String keytab = 
globalConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+                       String keytab = 
globalConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
                        if(keytab != null) {
                                keytabPath = new File(keytab);
                                if(!keytabPath.exists()) {
                                        throw new 
IllegalStateException("Invalid configuration for " +
-                                               
ConfigConstants.SECURITY_KEYTAB_KEY +
+                                               
SecurityOptions.KERBEROS_LOGIN_KEYTAB +
                                                "; '" + keytab + "' not 
found.");
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
new file mode 100644
index 0000000..6af4f23
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at 
runtime, building upon
+ * an (optional) underlying configuration.   Entries from the underlying 
configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(DynamicConfiguration.class);
+
+       private final Configuration delegate;
+
+       private final Map<String,AppConfigurationEntry[]> dynamicEntries = new 
HashMap<>();
+
+       /**
+        * Create a dynamic configuration.
+        * @param delegate an underlying configuration to delegate to, or null.
+     */
+       public DynamicConfiguration(@Nullable Configuration delegate) {
+               this.delegate = delegate;
+       }
+
+       /**
+        * Add entries for the given application name.
+     */
+       public void addAppConfigurationEntry(String name, 
AppConfigurationEntry... entry) {
+               final AppConfigurationEntry[] existing = 
dynamicEntries.get(name);
+               final AppConfigurationEntry[] updated;
+               if(existing == null) {
+                       updated = Arrays.copyOf(entry, entry.length);
+               }
+               else {
+                       updated = merge(existing, entry);
+               }
+               dynamicEntries.put(name, updated);
+       }
+
+       /**
+        * Retrieve the AppConfigurationEntries for the specified <i>name</i>
+        * from this Configuration.
+        *
+        * <p>
+        *
+        * @param name the name used to index the Configuration.
+        *
+        * @return an array of AppConfigurationEntries for the specified 
<i>name</i>
+        *          from this Configuration, or null if there are no entries
+        *          for the specified <i>name</i>
+        */
+       @Override
+       public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+               AppConfigurationEntry[] entry = null;
+               if(delegate != null) {
+                       entry = delegate.getAppConfigurationEntry(name);
+               }
+               final AppConfigurationEntry[] existing = 
dynamicEntries.get(name);
+               if(existing != null) {
+                       if(entry != null) {
+                               entry = merge(entry, existing);
+                       }
+                       else {
+                               entry = Arrays.copyOf(existing, 
existing.length);
+                       }
+               }
+               return entry;
+       }
+
+       private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, 
AppConfigurationEntry[] b) {
+               AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + 
b.length);
+               System.arraycopy(b, 0, merged, a.length, b.length);
+               return merged;
+       }
+
+       @Override
+       public void refresh() {
+               if(delegate != null) {
+                       delegate.refresh();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
deleted file mode 100644
index c4527dd..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * JAAS configuration provider object that provides default LoginModule for 
various connectors that supports
- * JAAS/SASL based Kerberos authentication. The implementation is inspired 
from Hadoop UGI class.
- *
- * Different connectors uses different login module name to implement JAAS 
based authentication support.
- * For example, Kafka expects the login module name to be "kafkaClient" 
whereas ZooKeeper expect the
- * name to be "client". This sets responsibility on the Flink cluster 
administrator to configure/provide right
- * JAAS config entries. To simplify this requirement, we have introduced this 
abstraction that provides
- * a standard lookup to get the login module entry for the JAAS based 
authentication to work.
- *
- * HDFS connector will not be impacted with this configuration since it uses 
UGI based mechanism to authenticate.
- *
- * <a 
href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html";>Configuration</a>
- *
- */
-
-@Internal
-public class JaasConfiguration extends Configuration {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(JaasConfiguration.class);
-
-       public static final String JAVA_VENDOR_NAME = 
System.getProperty("java.vendor");
-
-       public static final boolean IBM_JAVA;
-
-       private static final Map<String, String> debugOptions = new HashMap<>();
-
-       private static final Map<String, String> kerberosCacheOptions = new 
HashMap<>();
-
-       private static final Map<String, String> keytabKerberosOptions = new 
HashMap<>();
-
-       private static final AppConfigurationEntry userKerberosAce;
-
-       private AppConfigurationEntry keytabKerberosAce = null;
-
-       static {
-
-               IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
-
-               if(LOG.isDebugEnabled()) {
-                       debugOptions.put("debug", "true");
-               }
-
-               if(IBM_JAVA) {
-                       kerberosCacheOptions.put("useDefaultCcache", "true");
-               } else {
-                       kerberosCacheOptions.put("doNotPrompt", "true");
-                       kerberosCacheOptions.put("useTicketCache", "true");
-               }
-
-               String ticketCache = System.getenv("KRB5CCNAME");
-               if(ticketCache != null) {
-                       if(IBM_JAVA) {
-                               System.setProperty("KRB5CCNAME", ticketCache);
-                       } else {
-                               kerberosCacheOptions.put("ticketCache", 
ticketCache);
-                       }
-               }
-
-               kerberosCacheOptions.put("renewTGT", "true");
-               kerberosCacheOptions.putAll(debugOptions);
-
-               userKerberosAce = new AppConfigurationEntry(
-                               KerberosUtil.getKrb5LoginModuleName(),
-                               
AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
-                               kerberosCacheOptions);
-
-       }
-
-       protected JaasConfiguration(String keytab, String principal) {
-
-               LOG.info("Initializing JAAS configuration instance. Parameters: 
{}, {}", keytab, principal);
-
-               if(StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal) ||
-                               (!StringUtils.isBlank(keytab) && 
StringUtils.isBlank(principal))){
-                       throw new RuntimeException("Both keytab and principal 
are required and cannot be empty");
-               }
-
-               if(!StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal)) {
-
-                       if(IBM_JAVA) {
-                               keytabKerberosOptions.put("useKeytab", 
prependFileUri(keytab));
-                               keytabKerberosOptions.put("credsType", "both");
-                       } else {
-                               keytabKerberosOptions.put("keyTab", keytab);
-                               keytabKerberosOptions.put("doNotPrompt", 
"true");
-                               keytabKerberosOptions.put("useKeyTab", "true");
-                               keytabKerberosOptions.put("storeKey", "true");
-                       }
-
-                       keytabKerberosOptions.put("principal", principal);
-                       keytabKerberosOptions.put("refreshKrb5Config", "true");
-                       keytabKerberosOptions.putAll(debugOptions);
-
-                       keytabKerberosAce = new AppConfigurationEntry(
-                                       KerberosUtil.getKrb5LoginModuleName(),
-                                       
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                                       keytabKerberosOptions);
-               }
-       }
-
-       public static Map<String, String> getKeytabKerberosOptions() {
-               return keytabKerberosOptions;
-       }
-
-       private static String prependFileUri(String keytabPath) {
-               File f = new File(keytabPath);
-               return f.toURI().toString();
-       }
-
-       @Override
-       public AppConfigurationEntry[] getAppConfigurationEntry(String 
applicationName) {
-
-               LOG.debug("JAAS configuration requested for the application 
entry: {}", applicationName);
-
-               AppConfigurationEntry[] appConfigurationEntry;
-
-               if(keytabKerberosAce != null) {
-                       appConfigurationEntry = new AppConfigurationEntry[] 
{keytabKerberosAce, userKerberosAce};
-               } else {
-                       appConfigurationEntry = new AppConfigurationEntry[] 
{userKerberosAce};
-               }
-
-               return appConfigurationEntry;
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
new file mode 100644
index 0000000..7ef9187
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ *
+ * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
+ *
+ * The implementation is inspired from Hadoop UGI class.
+ */
+@Internal
+public class KerberosUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KerberosUtils.class);
+
+       private static final String JAVA_VENDOR_NAME = 
System.getProperty("java.vendor");
+
+       private static final boolean IBM_JAVA;
+
+       private static final Map<String, String> debugOptions = new HashMap<>();
+
+       private static final Map<String, String> kerberosCacheOptions = new 
HashMap<>();
+
+       private static final AppConfigurationEntry userKerberosAce;
+
+       static {
+
+               IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+               if(LOG.isDebugEnabled()) {
+                       debugOptions.put("debug", "true");
+               }
+
+               if(IBM_JAVA) {
+                       kerberosCacheOptions.put("useDefaultCcache", "true");
+               } else {
+                       kerberosCacheOptions.put("doNotPrompt", "true");
+                       kerberosCacheOptions.put("useTicketCache", "true");
+               }
+
+               String ticketCache = System.getenv("KRB5CCNAME");
+               if(ticketCache != null) {
+                       if(IBM_JAVA) {
+                               System.setProperty("KRB5CCNAME", ticketCache);
+                       } else {
+                               kerberosCacheOptions.put("ticketCache", 
ticketCache);
+                       }
+               }
+
+               kerberosCacheOptions.put("renewTGT", "true");
+               kerberosCacheOptions.putAll(debugOptions);
+
+               userKerberosAce = new AppConfigurationEntry(
+                               KerberosUtil.getKrb5LoginModuleName(),
+                               
AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+                               kerberosCacheOptions);
+
+       }
+
+       public static AppConfigurationEntry ticketCacheEntry() {
+               return userKerberosAce;
+       }
+
+       public static AppConfigurationEntry keytabEntry(String keytab, String 
principal) {
+
+               checkNotNull(keytab, "keytab");
+               checkNotNull(principal, "principal");
+
+               Map<String, String> keytabKerberosOptions = new HashMap<>();
+
+               if(IBM_JAVA) {
+                       keytabKerberosOptions.put("useKeytab", 
prependFileUri(keytab));
+                       keytabKerberosOptions.put("credsType", "both");
+               } else {
+                       keytabKerberosOptions.put("keyTab", keytab);
+                       keytabKerberosOptions.put("doNotPrompt", "true");
+                       keytabKerberosOptions.put("useKeyTab", "true");
+                       keytabKerberosOptions.put("storeKey", "true");
+               }
+
+               keytabKerberosOptions.put("principal", principal);
+               keytabKerberosOptions.put("refreshKrb5Config", "true");
+               keytabKerberosOptions.putAll(debugOptions);
+
+               AppConfigurationEntry keytabKerberosAce = new 
AppConfigurationEntry(
+                               KerberosUtil.getKrb5LoginModuleName(),
+                               
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                               keytabKerberosOptions);
+
+               return keytabKerberosAce;
+       }
+
+       private static String prependFileUri(String keytabPath) {
+               File f = new File(keytabPath);
+               return f.toURI().toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d7fc6ff..d76e7a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,216 +18,162 @@
 
 package org.apache.flink.runtime.security;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.JaasModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.runtime.security.modules.ZooKeeperModule;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /*
- * Utils for configuring security. The following security mechanism are 
supported:
+ * Utils for configuring security. The following security subsystems are 
supported:
  *
  * 1. Java Authentication and Authorization Service (JAAS)
  * 2. Hadoop's User Group Information (UGI)
+ * 3. ZooKeeper's process-wide security settings.
  */
 public class SecurityUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SecurityUtils.class);
 
-       public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
-
-       public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
-
-       private static final String ZOOKEEPER_SASL_CLIENT = 
"zookeeper.sasl.client";
-
-       private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = 
"zookeeper.sasl.client.username";
-
        private static SecurityContext installedContext = new 
NoOpSecurityContext();
 
+       private static List<SecurityModule> installedModules = null;
+
        public static SecurityContext getInstalledContext() { return 
installedContext; }
 
+       @VisibleForTesting
+       static List<SecurityModule> getInstalledModules() {
+               return installedModules;
+       }
+
        /**
-        * Performs a static initialization of the JAAS and Hadoop UGI security 
mechanism.
-        * It creates the in-memory JAAS configuration object which will serve 
appropriate
-        * ApplicationConfigurationEntry for the connector login module 
implementation that
-        * authenticates Kerberos identity using SASL/JAAS based mechanism.
+        * Installs a process-wide security configuration.
+        *
+        * Applies the configuration using the available security modules (i.e. 
Hadoop, JAAS).
         */
        public static void install(SecurityConfiguration config) throws 
Exception {
 
-               if (!config.securityIsEnabled()) {
-                       // do not perform any initialization if no Kerberos 
crendetails are provided
-                       return;
+               // install the security modules
+               List<SecurityModule> modules = new ArrayList<>();
+               try {
+                       for (Class<? extends SecurityModule> moduleClass : 
config.getSecurityModules()) {
+                               SecurityModule module = 
moduleClass.newInstance();
+                               module.install(config);
+                               modules.add(module);
+                       }
                }
+               catch(Exception ex) {
+                       throw new Exception("unable to establish the security 
context", ex);
+               }
+               installedModules = modules;
 
-               // establish the JAAS config
-               JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-               
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-               populateSystemSecurityProperties(config.flinkConf);
-
-               // establish the UGI login user
-               UserGroupInformation.setConfiguration(config.hadoopConf);
-
-               // only configure Hadoop security if we have security enabled
-               if (UserGroupInformation.isSecurityEnabled()) {
-
-                       final UserGroupInformation loginUser;
-
-                       if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-                               String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-                               
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-                               loginUser = UserGroupInformation.getLoginUser();
-
-                               // supplement with any available tokens
-                               String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-                               if (fileLocation != null) {
-                               /*
-                                * Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-                                * used in the context of reading the stored 
tokens from UGI.
-                                * Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-                                * loginUser.addCredentials(cred);
-                               */
-                                       try {
-                                               Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-                                                       File.class, 
org.apache.hadoop.conf.Configuration.class);
-                                               Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-                                                       config.hadoopConf);
-                                               Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-                                                       Credentials.class);
-                                               
addCredentialsMethod.invoke(loginUser, cred);
-                                       } catch (NoSuchMethodException e) {
-                                               LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-                                       }
-                               }
-                       } else {
-                               // login with current user credentials (e.g. 
ticket cache)
+               // install a security context
+               // use the Hadoop login user as the subject of the installed 
security context
+               if (!(installedContext instanceof NoOpSecurityContext)) {
+                       LOG.warn("overriding previous security context");
+               }
+               UserGroupInformation loginUser = 
UserGroupInformation.getLoginUser();
+               installedContext = new HadoopSecurityContext(loginUser);
+       }
+
+       static void uninstall() {
+               if(installedModules != null) {
+                       for (SecurityModule module : 
Lists.reverse(installedModules)) {
                                try {
-                                       //Use reflection API to get the login 
user object
-                                       
//UserGroupInformation.loginUserFromSubject(null);
-                                       Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-                                       Subject subject = null;
-                                       loginUserFromSubjectMethod.invoke(null, 
subject);
-                               } catch (NoSuchMethodException e) {
-                                       LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
+                                       module.uninstall();
                                }
-
-                               // note that the stored tokens are read 
automatically
-                               loginUser = UserGroupInformation.getLoginUser();
-                       }
-
-                       LOG.info("Hadoop user set to {}", loginUser.toString());
-
-                       boolean delegationToken = false;
-                       final Text HDFS_DELEGATION_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
-                       Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();
-                       for (Token<? extends TokenIdentifier> token : usrTok) {
-                               final Text id = new Text(token.getIdentifier());
-                               LOG.debug("Found user token " + id + " with " + 
token);
-                               if 
(token.getKind().equals(HDFS_DELEGATION_KIND)) {
-                                       delegationToken = true;
+                               catch(UnsupportedOperationException ignored) {
                                }
-                       }
-
-                       if (!loginUser.hasKerberosCredentials()) {
-                               //throw an error in non-yarn deployment if 
kerberos cache is not available
-                               if (!delegationToken) {
-                                       LOG.error("Hadoop Security is enabled 
but current login user does not have Kerberos Credentials");
-                                       throw new RuntimeException("Hadoop 
Security is enabled but current login user does not have Kerberos Credentials");
+                               catch(SecurityModule.SecurityInstallException 
e) {
+                                       LOG.warn("unable to uninstall a 
security module", e);
                                }
                        }
-
-                       if (!(installedContext instanceof NoOpSecurityContext)) 
{
-                               LOG.warn("overriding previous security 
context");
-                       }
-
-                       installedContext = new HadoopSecurityContext(loginUser);
+                       installedModules = null;
                }
-       }
 
-       static void clearContext() {
                installedContext = new NoOpSecurityContext();
        }
 
-       /*
-        * This method configures some of the system properties that are 
require for ZK and Kafka SASL authentication
-        * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
-        * See: 
https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-        * In this method, setting java.security.auth.login.config 
configuration is configured only to support ZK and
-        * Kafka current code behavior.
+       /**
+        * The global security configuration.
+        *
+        * See {@link SecurityOptions} for corresponding configuration options.
         */
-       private static void populateSystemSecurityProperties(Configuration 
configuration) {
-               Preconditions.checkNotNull(configuration, "The supplied 
configuration was null");
+       public static class SecurityConfiguration {
 
-               boolean disableSaslClient = 
configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+               private static final List<Class<? extends SecurityModule>> 
DEFAULT_MODULES = Collections.unmodifiableList(
+                       Arrays.asList(HadoopModule.class, JaasModule.class, 
ZooKeeperModule.class));
 
-               if (disableSaslClient) {
-                       LOG.info("SASL client auth for ZK will be disabled");
-                       //SASL auth is disabled by default but will be enabled 
if specified in configuration
-                       System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
-                       return;
-               }
+               private final List<Class<? extends SecurityModule>> 
securityModules;
 
-               // load Jaas config file to initialize SASL
-               final File jaasConfFile;
-               try {
-                       Path jaasConfPath = 
Files.createTempFile(JAAS_CONF_FILENAME, "");
-                       InputStream jaasConfStream = 
SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
-                       Files.copy(jaasConfStream, jaasConfPath, 
StandardCopyOption.REPLACE_EXISTING);
-                       jaasConfFile = jaasConfPath.toFile();
-                       jaasConfFile.deleteOnExit();
-                       jaasConfStream.close();
-               } catch (IOException e) {
-                       throw new RuntimeException("SASL auth is enabled for ZK 
but unable to " +
-                               "locate pseudo Jaas config provided with 
Flink", e);
-               }
+               private final org.apache.hadoop.conf.Configuration hadoopConf;
 
-               LOG.info("Enabling {} property with pseudo JAAS config file: 
{}",
-                               JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
+               private final boolean useTicketCache;
 
-               //ZK client module lookup the configuration to handle SASL.
-               
//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
-               System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
-               System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
+               private final String keytab;
 
-               String zkSaslServiceName = 
configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
-               if (!StringUtils.isBlank(zkSaslServiceName)) {
-                       LOG.info("ZK SASL service name: {} is provided in the 
configuration", zkSaslServiceName);
-                       System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, 
zkSaslServiceName);
-               }
+               private final String principal;
 
-       }
+               private final List<String> loginContextNames;
 
-       /**
-        * Inputs for establishing the security context.
-        */
-       public static class SecurityConfiguration {
+               private final String zkServiceName;
 
-               private Configuration flinkConf;
+               private final String zkLoginContextName;
 
-               private org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+               /**
+                * Create a security configuration from the global 
configuration.
+                * @param flinkConf the Flink global configuration.
+         */
+               public SecurityConfiguration(Configuration flinkConf) {
+                       this(flinkConf, HadoopUtils.getHadoopConfiguration());
+               }
 
-               private String keytab;
+               /**
+                * Create a security configuration from the global 
configuration.
+                * @param flinkConf the Flink global configuration.
+                * @param hadoopConf the Hadoop configuration.
+                */
+               public SecurityConfiguration(Configuration flinkConf, 
org.apache.hadoop.conf.Configuration hadoopConf) {
+                       this(flinkConf, hadoopConf, DEFAULT_MODULES);
+               }
 
-               private String principal;
+               /**
+                * Create a security configuration from the global 
configuration.
+                * @param flinkConf the Flink global configuration.
+                * @param hadoopConf the Hadoop configuration.
+                * @param securityModules the security modules to apply.
+                */
+               public SecurityConfiguration(Configuration flinkConf,
+                               org.apache.hadoop.conf.Configuration hadoopConf,
+                               List<? extends Class<? extends SecurityModule>> 
securityModules) {
+                       this.hadoopConf = checkNotNull(hadoopConf);
+                       this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
+                       this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+                       this.useTicketCache = 
flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+                       this.loginContextNames = 
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
+                       this.zkServiceName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+                       this.zkLoginContextName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
+                       this.securityModules = 
Collections.unmodifiableList(securityModules);
+
+                       validate();
+               }
 
                public String getKeytab() {
                        return keytab;
@@ -237,48 +183,50 @@ public class SecurityUtils {
                        return principal;
                }
 
-               public SecurityConfiguration(Configuration flinkConf) {
-                       this.flinkConf = flinkConf;
+               public boolean useTicketCache() {
+                       return useTicketCache;
+               }
 
-                       String keytab = 
flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
-                       String principal = 
flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
-                       validate(keytab, principal);
+               public org.apache.hadoop.conf.Configuration 
getHadoopConfiguration() {
+                       return hadoopConf;
+               }
 
-                       this.keytab = keytab;
-                       this.principal = principal;
+               public List<Class<? extends SecurityModule>> 
getSecurityModules() {
+                       return securityModules;
                }
 
-               public SecurityConfiguration 
setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
-                       this.hadoopConf = conf;
-                       return this;
+               public List<String> getLoginContextNames() {
+                       return loginContextNames;
                }
 
-               private void validate(String keytab, String principal) {
-                       LOG.debug("keytab {} and principal {} .", keytab, 
principal);
+               public String getZooKeeperServiceName() {
+                       return zkServiceName;
+               }
 
-                       if(StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal) ||
-                                       !StringUtils.isBlank(keytab) && 
StringUtils.isBlank(principal)) {
-                               if(StringUtils.isBlank(keytab)) {
-                                       LOG.warn("Keytab is null or empty");
-                               }
+               public String getZooKeeperLoginContextName() {
+                       return zkLoginContextName;
+               }
+
+               private void validate() {
+                       if(!StringUtils.isBlank(keytab)) {
+                               // principal is required
                                if(StringUtils.isBlank(principal)) {
-                                       LOG.warn("Principal is null or empty");
+                                       throw new 
IllegalConfigurationException("Kerberos login configuration is invalid; keytab 
requires a principal.");
                                }
-                               throw new RuntimeException("Requires both 
keytab and principal to be provided");
-                       }
 
-                       if(!StringUtils.isBlank(keytab)) {
+                               // check the keytab is readable
                                File keytabFile = new File(keytab);
-                               if(!keytabFile.exists() || 
!keytabFile.isFile()) {
-                                       LOG.warn("Not a valid keytab: {} file", 
keytab);
-                                       throw new RuntimeException("Invalid 
keytab file: " + keytab + " passed");
+                               if(!keytabFile.exists() || !keytabFile.isFile() 
|| !keytabFile.canRead()) {
+                                       throw new 
IllegalConfigurationException("Kerberos login configuration is invalid; keytab 
is unreadable");
                                }
                        }
-
                }
 
-               public boolean securityIsEnabled() {
-                       return keytab != null && principal != null;
+               private static List<String> parseList(String value) {
+                       if(value == null) {
+                               return Collections.emptyList();
+                       }
+                       return Arrays.asList(value.split(","));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
new file mode 100644
index 0000000..9344faf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopModule.class);
+
+       UserGroupInformation loginUser;
+
+       @Override
+       public void install(SecurityUtils.SecurityConfiguration securityConfig) 
throws SecurityInstallException {
+
+               
UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+               try {
+                       if (UserGroupInformation.isSecurityEnabled() &&
+                               
!StringUtils.isBlank(securityConfig.getKeytab()) && 
!StringUtils.isBlank(securityConfig.getPrincipal())) {
+                               String keytabPath = (new 
File(securityConfig.getKeytab())).getAbsolutePath();
+
+                               
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), 
keytabPath);
+
+                               loginUser = UserGroupInformation.getLoginUser();
+
+                               // supplement with any available tokens
+                               String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+                               if (fileLocation != null) {
+                                       /*
+                                        * Use reflection API since the API 
semantics are not available in Hadoop1 profile. Below APIs are
+                                        * used in the context of reading the 
stored tokens from UGI.
+                                        * Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+                                        * loginUser.addCredentials(cred);
+                                       */
+                                       try {
+                                               Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+                                                       File.class, 
org.apache.hadoop.conf.Configuration.class);
+                                               Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+                                                       
securityConfig.getHadoopConfiguration());
+                                               Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
+                                                       Credentials.class);
+                                               
addCredentialsMethod.invoke(loginUser, cred);
+                                       } catch (NoSuchMethodException e) {
+                                               LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
+                                       } catch (InvocationTargetException e) {
+                                               throw e.getTargetException();
+                                       }
+                               }
+                       } else {
+                               // login with current user credentials (e.g. 
ticket cache, OS login)
+                               // note that the stored tokens are read 
automatically
+                               try {
+                                       //Use reflection API to get the login 
user object
+                                       
//UserGroupInformation.loginUserFromSubject(null);
+                                       Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+                                       loginUserFromSubjectMethod.invoke(null, 
(Subject) null);
+                               } catch (NoSuchMethodException e) {
+                                       LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
+                               } catch (InvocationTargetException e) {
+                                       throw e.getTargetException();
+                               }
+
+                               loginUser = UserGroupInformation.getLoginUser();
+                       }
+
+                       if (UserGroupInformation.isSecurityEnabled()) {
+                               // note: UGI::hasKerberosCredentials 
inaccurately reports false
+                               // for logins based on a keytab (fixed in 
Hadoop 2.6.1, see HADOOP-10786),
+                               // so we check only in ticket cache scenario.
+                               if (securityConfig.useTicketCache() && 
!loginUser.hasKerberosCredentials()) {
+                                       // a delegation token is an adequate 
substitute in most cases
+                                       if 
(!HadoopUtils.hasHDFSDelegationToken()) {
+                                               LOG.warn("Hadoop security is 
enabled but current login user does not have Kerberos credentials");
+                                       }
+                               }
+                       }
+
+                       LOG.info("Hadoop user set to {}", loginUser);
+
+               } catch (Throwable ex) {
+                       throw new SecurityInstallException("Unable to set the 
Hadoop login user", ex);
+               }
+       }
+
+       @Override
+       public void uninstall() throws SecurityInstallException {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
new file mode 100644
index 0000000..f8b9bdf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * <p>
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * <p>
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka.  Note that the JRE actually draws on 
numerous file locations.
+ * See: 
https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JaasModule.class);
+
+       static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
+
+       static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+       private String priorConfigFile;
+       private javax.security.auth.login.Configuration priorConfig;
+
+       private DynamicConfiguration currentConfig;
+
+       @Override
+       public void install(SecurityUtils.SecurityConfiguration securityConfig) 
throws SecurityInstallException {
+
+               // ensure that a config file is always defined, for 
compatibility with
+               // ZK and Kafka which check for the system property and 
existence of the file
+               priorConfigFile = 
System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+               if (priorConfigFile == null) {
+                       File configFile = generateDefaultConfigFile();
+                       System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
configFile.getAbsolutePath());
+               }
+
+               // read the JAAS configuration file
+               priorConfig = 
javax.security.auth.login.Configuration.getConfiguration();
+
+               // construct a dynamic JAAS configuration
+               currentConfig = new DynamicConfiguration(priorConfig);
+
+               // wire up the configured JAAS login contexts to use the krb5 
entries
+               AppConfigurationEntry[] krb5Entries = 
getAppConfigurationEntries(securityConfig);
+               if(krb5Entries != null) {
+                       for (String app : 
securityConfig.getLoginContextNames()) {
+                               currentConfig.addAppConfigurationEntry(app, 
krb5Entries);
+                       }
+               }
+
+               
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+       }
+
+       @Override
+       public void uninstall() throws SecurityInstallException {
+               if(priorConfigFile != null) {
+                       System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
priorConfigFile);
+               } else {
+                       System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+               }
+               
javax.security.auth.login.Configuration.setConfiguration(priorConfig);
+       }
+
+       public DynamicConfiguration getCurrentConfiguration() {
+               return currentConfig;
+       }
+
+       private static AppConfigurationEntry[] 
getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
+
+               AppConfigurationEntry userKerberosAce = null;
+               if (securityConfig.useTicketCache()) {
+                       userKerberosAce = KerberosUtils.ticketCacheEntry();
+               }
+               AppConfigurationEntry keytabKerberosAce = null;
+               if (securityConfig.getKeytab() != null) {
+                       keytabKerberosAce = 
KerberosUtils.keytabEntry(securityConfig.getKeytab(), 
securityConfig.getPrincipal());
+               }
+
+               AppConfigurationEntry[] appConfigurationEntry;
+               if (userKerberosAce != null && keytabKerberosAce != null) {
+                       appConfigurationEntry = new 
AppConfigurationEntry[]{keytabKerberosAce, userKerberosAce};
+               } else if (keytabKerberosAce != null) {
+                       appConfigurationEntry = new 
AppConfigurationEntry[]{keytabKerberosAce};
+               } else if (userKerberosAce != null) {
+                       appConfigurationEntry = new 
AppConfigurationEntry[]{userKerberosAce};
+               } else {
+                       return null;
+               }
+
+               return appConfigurationEntry;
+       }
+
+       /**
+        * Generate the default JAAS config file.
+        */
+       private static File generateDefaultConfigFile() {
+               final File jaasConfFile;
+               try {
+                       Path jaasConfPath = Files.createTempFile("jaas-", 
".conf");
+                       try (InputStream resourceStream = 
JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) 
{
+                               Files.copy(resourceStream, jaasConfPath, 
StandardCopyOption.REPLACE_EXISTING);
+                       }
+                       jaasConfFile = jaasConfPath.toFile();
+                       jaasConfFile.deleteOnExit();
+               } catch (IOException e) {
+                       throw new RuntimeException("unable to generate a JAAS 
configuration file", e);
+               }
+               return jaasConfFile;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
new file mode 100644
index 0000000..fbe1db9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+import java.security.GeneralSecurityException;
+
+/**
+ * An installable security module.
+ */
+public interface SecurityModule {
+
+       /**
+        * Install the security module.
+        *
+        * @param configuration the security configuration.
+        * @throws SecurityInstallException if the security module couldn't be 
installed.
+        */
+       void install(SecurityUtils.SecurityConfiguration configuration) throws 
SecurityInstallException;
+
+       /**
+        * Uninstall the security module.
+        *
+        * @throws SecurityInstallException if the security module couldn't be 
uninstalled.
+        * @throws UnsupportedOperationException if the security module doesn't 
support uninstallation.
+        */
+       void uninstall() throws SecurityInstallException;
+
+       /**
+        * Indicates a problem with installing or uninstalling a security 
module.
+        */
+       class SecurityInstallException extends GeneralSecurityException {
+               private static final long serialVersionUID = 1L;
+
+               public SecurityInstallException(String msg) {
+                       super(msg);
+               }
+
+               public SecurityInstallException(String msg, Throwable cause) {
+                       super(msg, cause);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
new file mode 100644
index 0000000..c0ba4a5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.runtime.security.SecurityUtils;
+
+/**
+ * Responsible for installing a process-wide ZooKeeper security configuration.
+ */
+public class ZooKeeperModule implements SecurityModule {
+
+       private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = 
"zookeeper.sasl.client.username";
+
+       /**
+        * A system property for setting whether ZK uses SASL.
+        */
+       private static final String ZK_ENABLE_CLIENT_SASL = 
"zookeeper.sasl.client";
+
+       /**
+        * A system property for setting the expected ZooKeeper service name.
+        */
+       private static final String ZK_SASL_CLIENT_USERNAME = 
"zookeeper.sasl.client.username";
+
+       /**
+        * A system property for setting the login context name to use.
+        */
+       private static final String ZK_LOGIN_CONTEXT_NAME = 
"zookeeper.sasl.clientconfig";
+
+       private String priorServiceName;
+
+       private String priorLoginContextName;
+
+       @Override
+       public void install(SecurityUtils.SecurityConfiguration configuration) 
throws SecurityInstallException {
+
+               priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, 
null);
+               if 
(!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
+                       System.setProperty(ZK_SASL_CLIENT_USERNAME, 
configuration.getZooKeeperServiceName());
+               }
+
+               priorLoginContextName = 
System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
+               if 
(!"Client".equals(configuration.getZooKeeperLoginContextName())) {
+                       System.setProperty(ZK_LOGIN_CONTEXT_NAME, 
configuration.getZooKeeperLoginContextName());
+               }
+       }
+
+       @Override
+       public void uninstall() throws SecurityInstallException {
+               if(priorServiceName != null) {
+                       System.setProperty(ZK_SASL_CLIENT_USERNAME, 
priorServiceName);
+               } else {
+                       System.clearProperty(ZK_SASL_CLIENT_USERNAME);
+               }
+               if(priorLoginContextName != null) {
+                       System.setProperty(ZK_LOGIN_CONTEXT_NAME, 
priorLoginContextName);
+               } else {
+                       System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
+               }
+       }
+
+}

Reply via email to