[FLINK-3929] Added Keytab based Kerberos support to enable secure Flink cluster 
deployment(addresses HDHS, Kafka and ZK services)

FLINK-3929 Added MiniKDC support for Kafka, Zookeeper, RollingFS and Yarn 
integration test modules


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25a622fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25a622fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25a622fd

Branch: refs/heads/flip-6
Commit: 25a622fd9c9255bd1a5b4b6ff7891730dce34ac1
Parents: 303f6fe
Author: Vijay Srinivasaraghavan <vijayaraghavan.srinivasaragha...@emc.com>
Authored: Wed Jul 20 17:08:33 2016 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Sep 20 22:03:29 2016 +0200

----------------------------------------------------------------------
 docs/internals/flink_security.md                |  87 ++++++
 docs/setup/config.md                            |  27 +-
 .../org/apache/flink/client/CliFrontend.java    |  38 +--
 .../flink/configuration/ConfigConstants.java    |  22 ++
 .../org/apache/flink/util/Preconditions.java    |   2 +-
 .../src/main/flink-bin/conf/flink-jaas.conf     |  26 ++
 flink-dist/src/main/resources/flink-conf.yaml   |  25 ++
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../clusterframework/BootstrapTools.java        |   7 +
 .../runtime/security/JaasConfiguration.java     | 160 ++++++++++
 .../flink/runtime/security/SecurityContext.java | 313 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  34 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  25 +-
 .../runtime/security/JaasConfigurationTest.java |  52 +++
 .../runtime/security/SecurityContextTest.java   |  77 +++++
 .../flink-connector-filesystem/pom.xml          |  36 +++
 .../connectors/fs/RollingSinkITCase.java        |  19 +-
 .../connectors/fs/RollingSinkSecuredITCase.java | 232 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |   2 +
 .../connectors/kafka/Kafka08ProducerITCase.java |   1 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  13 +-
 .../flink-connector-kafka-0.9/pom.xml           |  18 ++
 .../connectors/kafka/Kafka09ITCase.java         |  14 +-
 .../connectors/kafka/Kafka09ProducerITCase.java |   1 -
 .../kafka/Kafka09SecureRunITCase.java           |  62 ++++
 .../kafka/KafkaTestEnvironmentImpl.java         |  81 ++++-
 .../src/test/resources/log4j-test.properties    |   2 +
 .../flink-connector-kafka-base/pom.xml          |  19 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 100 ++++--
 .../connectors/kafka/KafkaProducerTestBase.java |  14 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  29 +-
 .../connectors/kafka/KafkaTestBase.java         |  72 +++--
 .../connectors/kafka/KafkaTestEnvironment.java  |  11 +-
 .../kafka/testutils/DataGenerators.java         |   9 +-
 .../flink-test-utils/pom.xml                    |  25 ++
 .../util/StreamingMultipleProgramsTestBase.java |   6 +
 .../flink/test/util/SecureTestEnvironment.java  | 249 +++++++++++++++
 .../test/util/TestingJaasConfiguration.java     | 106 +++++++
 .../flink/test/util/TestingSecurityContext.java |  80 +++++
 flink-yarn-tests/pom.xml                        |  20 ++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  14 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  23 ++
 .../flink/yarn/YARNSessionFIFOITCase.java       |  24 ++
 .../yarn/YARNSessionFIFOSecuredITCase.java      | 103 ++++++
 .../org/apache/flink/yarn/YarnTestBase.java     |  87 +++++-
 .../src/test/resources/log4j-test.properties    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 104 ++++--
 .../main/java/org/apache/flink/yarn/Utils.java  |   8 +
 .../flink/yarn/YarnApplicationMasterRunner.java | 158 ++++++++--
 .../org/apache/flink/yarn/YarnConfigKeys.java   |   6 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  88 ++++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  28 +-
 pom.xml                                         |   7 +
 tools/log4j-travis.properties                   |   3 +
 55 files changed, 2553 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/internals/flink_security.md
----------------------------------------------------------------------
diff --git a/docs/internals/flink_security.md b/docs/internals/flink_security.md
new file mode 100644
index 0000000..846273b
--- /dev/null
+++ b/docs/internals/flink_security.md
@@ -0,0 +1,87 @@
+---
+title:  "Flink Security"
+# Top navigation
+top-nav-group: internals
+top-nav-pos: 10
+top-nav-title: Flink Security
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document briefly describes how Flink security works in the context of 
various deployment mechanism (Standalone/Cluster vs YARN) 
+and the connectors that participates in Flink Job execution stage. This 
documentation can be helpful for both administrators and developers 
+who plans to run Flink on a secure environment.
+
+## Objective
+
+The primary goal of Flink security model is to enable secure data access for 
jobs within a cluster via connectors. In a production deployment scenario, 
+streaming jobs are understood to run for longer period of time 
(days/weeks/months) and the system must be  able to authenticate against secure 
+data sources throughout the life of the job. The current implementation 
supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
+context of a Kerberos identity based on Keytab credential supplied during 
deployment time. Any jobs submitted will continue to run in the identity of the 
cluster.
+
+## How Flink Security works
+Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web 
UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
+A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, 
Kinesis etc.,) and each connector may have a specific security 
+requirements (Kerberos, database based, SSL/TLS, custom etc.,). While 
satisfying the security requirements for all the connectors evolves over a 
period 
+of time, at this time of writing, the following connectors/services are tested 
for Kerberos/Keytab based security.
+
+- Kafka (0.9)
+- HDFS
+- ZooKeeper
+
+Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a 
static implementation that takes care of handling Kerberos authentication. The 
Flink bootstrap implementation
+(JM/TM/CLI) takes care of instantiating UGI with the appropriate security 
credentials to establish the necessary security context.
+
+Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism 
to authenticate against a Kerberos server. It expects JAAS configuration with a 
platform-specific login 
+module *name* to be provided. Managing per-connector configuration files will 
be an overhead and to overcome this requirement, a process-wide JAAS 
configuration object is 
+instantiated which serves standard ApplicationConfigurationEntry for the 
connectors that authenticates using SASL/JAAS mechanism.
+
+It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself 
uses UGI's doAS() implementation to run under a specific user context, i.e. if 
Hadoop security is enabled 
+then the Flink processes will be running under a secure user account or else 
it will run as the OS login user account who starts the Flink cluster.
+
+## Security Configurations
+
+Secure credentials can be supplied by adding below configuration elements to 
Flink configuration file:
+
+- `security.keytab`: Absolute path to Kerberos keytab file that contains the 
user credentials/secret.
+
+- `security.principal`: User principal name that the Flink cluster should run 
as.
+
+The delegation token mechanism (*kinit cache*) is still supported for backward 
compatibility but enabling security using *keytab* configuration is the 
preferred and recommended approach.
+
+## Standalone Mode:
+
+Steps to run a secure Flink cluster in standalone/cluster mode:
+- Add security configurations to Flink configuration file (on all cluster 
nodes) 
+- Make sure the Keytab file exist in the path as indicated in 
*security.keytab* configuration on all cluster nodes
+- Deploy Flink cluster using cluster start/stop scripts or CLI
+
+## Yarn Mode:
+
+Steps to run secure Flink cluster in Yarn mode:
+- Add security configurations to Flink configuration file (on the node from 
where cluster will be provisioned using Flink/Yarn CLI) 
+- Make sure the Keytab file exist in the path as indicated in 
*security.keytab* configuration
+- Deploy Flink cluster using CLI
+
+In Yarn mode, the user supplied keytab will be copied over to the Yarn 
containers (App Master/JM and TM) as the Yarn local resource file.
+Security implementation details are based on <a 
href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md";>Yarn
 security</a> 
+
+## Token Renewal
+
+UGI and Kafka/ZK login module implementations takes care of auto-renewing the 
tickets upon reaching expiry and no further action is needed on the part of 
Flink.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 51475cc..54ef394 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -95,18 +95,35 @@ These options are useful for debugging a Flink application 
for memory and garbag
 
 ### Kerberos
 
-Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, 
or HBase.
+Flink supports Kerberos authentication for the following services 
+
++ Hadoop Components: such as HDFS, YARN, or HBase.
++ Kafka Connectors (version 0.9+)
++ Zookeeper Server/Client
+
+Hadoop components relies on the UserGroupInformation (UGI) implementation to 
handle Kerberos authentication, whereas Kafka and Zookeeper services handles 
Kerberos authentication through SASL/JAAS implementation.
 
 **Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
   other versions have critical bugs which might fail the Flink job
   unexpectedly.**
 
+**Ticket cache** and **Keytab** modes are supported for all above mentioned 
services.
+
+> Ticket cache (Supported only to provide backward compatibility support. 
Keytab is the preferred approach for long running jobs)
+
 While Hadoop uses Kerberos tickets to authenticate users with services 
initially, the authentication process continues differently afterwards. Instead 
of saving the ticket to authenticate on a later access, Hadoop creates its own 
security tokens (DelegationToken) that it passes around. These are 
authenticated to Kerberos periodically but are independent of the token renewal 
time. The tokens have a maximum life span identical to the Kerberos ticket 
maximum life span.
 
-Please make sure to set the maximum ticket life span high long running jobs. 
The renewal time of the ticket, on the other hand, is not important because 
Hadoop abstracts this away using its own security tocken renewal system. Hadoop 
makes sure that tickets are renewed in time and you can be sure to be 
authenticated until the end of the ticket life time.
+While using ticket cache mode, please make sure to set the maximum ticket life 
span high long running jobs. 
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+> Keytab (security principal and keytab can be configured through Flink 
configuration file)
+- `security.keytab`: Path to Keytab file
+- `security.principal`: Principal associated with the keytab
+
+Kerberos ticket renewal is abstracted and automatically handled by the 
Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and 
you can be sure to be authenticated until the end of the ticket life time.
+
+For Kafka and ZK, process-wide JAAS config will be created using the provided 
security credentials and the Kerberos authentication will be handled by 
Kafka/ZK login handlers.
 
 ### Other
 
@@ -315,6 +332,12 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the 
delay before persisted jobs are recovered in case of a master recovery 
situation. Previously this key was named `recovery.job.delay`.
 
+### ZooKeeper-Security
+
+- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based 
authentication needs to be enabled or disabled. The configuration value can be 
set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)
+
+- `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper 
server is configured with a different service name (default:"zookeeper") then 
it can be supplied using this configuration. A mismatch in service name between 
client and server configuration will cause the authentication to fail. 
+
 ## Environment
 
 - `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines 
the directory where the Flink logs are saved. It has to be an absolute path.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c7fb647..575ffad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -66,7 +66,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -161,6 +161,9 @@ public class CliFrontend {
                                "filesystem scheme from configuration.", e);
                }
 
+               this.config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDirectory.getAbsolutePath()
+                               + ".." + File.separator);
+
                this.clientTimeout = AkkaUtils.getClientTimeout(config);
        }
 
@@ -982,25 +985,7 @@ public class CliFrontend {
                // do action
                switch (action) {
                        case ACTION_RUN:
-                               // run() needs to run in a secured environment 
for the optimizer.
-                               if (SecurityUtils.isSecurityEnabled()) {
-                                       String message = "Secure Hadoop 
environment setup detected. Running in secure context.";
-                                       LOG.info(message);
-
-                                       try {
-                                               return 
SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
-                                                       @Override
-                                                       public Integer run() 
throws Exception {
-                                                               return 
CliFrontend.this.run(params);
-                                                       }
-                                               });
-                                       }
-                                       catch (Exception e) {
-                                               return handleError(e);
-                                       }
-                               } else {
-                                       return run(params);
-                               }
+                               return CliFrontend.this.run(params);
                        case ACTION_LIST:
                                return list(params);
                        case ACTION_INFO:
@@ -1037,12 +1022,19 @@ public class CliFrontend {
        /**
         * Submits the job based on the arguments
         */
-       public static void main(String[] args) {
+       public static void main(final String[] args) {
                EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line 
Client", args);
 
                try {
-                       CliFrontend cli = new CliFrontend();
-                       int retCode = cli.parseParameters(args);
+                       final CliFrontend cli = new CliFrontend();
+                       SecurityContext.install(new 
SecurityContext.SecurityConfiguration().setFlinkConfiguration(cli.config));
+                       int retCode = SecurityContext.getInstalled()
+                                       .runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                                               @Override
+                                               public Integer run() {
+                                                       return 
cli.parseParameters(args);
+                                               }
+                                       });
                        System.exit(retCode);
                }
                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0f1b6b..9e66e2a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -758,6 +758,12 @@ public final class ConfigConstants {
        @PublicEvolving
        public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
"high-availability.zookeeper.client.max-retry-attempts";
 
+       @PublicEvolving
+       public static final String ZOOKEEPER_SASL_DISABLE = 
"zookeeper.sasl.disable";
+
+       @PublicEvolving
+       public static final String ZOOKEEPER_SASL_SERVICE_NAME = 
"zookeeper.sasl.service-name";
+
        /** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
        @Deprecated
        public static final String ZOOKEEPER_QUORUM_KEY = 
"recovery.zookeeper.quorum";
@@ -1233,6 +1239,9 @@ public final class ConfigConstants {
        /** ZooKeeper default leader port. */
        public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
+       /** Defaults for ZK client security **/
+       public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
+
        // ------------------------- Queryable state 
------------------------------
 
        /** Port to bind KvState server to. */
@@ -1279,6 +1288,19 @@ public final class ConfigConstants {
        /** The environment variable name which contains the location of the 
lib folder */
        public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+       // -------------------------------- Security 
-------------------------------
+
+       /**
+        * The config parameter defining security credentials required
+        * for securing Flink cluster.
+        */
+
+       /** Keytab file key name to be used in flink configuration file */
+       public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+       /** Kerberos security principal key name to be used in flink 
configuration file */
+       public static final String SECURITY_PRINCIPAL_KEY = 
"security.principal";
+
 
        /**
         * Not instantiable.

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java 
b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
index ea6b9dd..e970c13 100644
--- a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
+++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java
@@ -293,7 +293,7 @@ public final class Preconditions {
 
                return builder.toString();
        }
-       
+
        // 
------------------------------------------------------------------------
 
        /** Private constructor to prevent instantiation */

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/flink-jaas.conf 
b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
new file mode 100644
index 0000000..d476e24
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/flink-jaas.conf
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# We are using this file as an workaround for the Kafka and ZK SASL 
implementation
+# since they explicitly look for java.security.auth.login.config property
+# The file itself is not used by the application since the internal 
implementation
+# uses a process-wide in-memory java security configuration object.
+# Please do not edit/delete this file - See FLINK-3929
+sample {
+  useKeyTab=false
+  useTicketCache=true;
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index 27fd84a..c876922 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -139,6 +139,31 @@ jobmanager.web.port: 8081
 #
 # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
+
 # high-availability: zookeeper
 # high-availability.zookeeper.quorum: localhost:2181
 # high-availability.zookeeper.storageDir: hdfs:///flink/ha/
+
+#==============================================================================
+# Flink Cluster Security Configuration (optional configuration)
+#==============================================================================
+
+# Kerberos security for the connectors can be enabled by providing below 
configurations
+# Security works in two modes - keytab/principal combination or using the 
Kerberos token cache
+# If keytab and principal are not provided, token cache (manual kinit) will be 
used
+
+#security.keytab: /path/to/kerberos/keytab
+#security.principal: flink-user
+
+#==============================================================================
+# ZK Security Configuration (optional configuration)
+#==============================================================================
+# Below configurations are applicable if ZK quorum is configured for Kerberos 
security
+
+# SASL authentication is disabled by default and can be enabled by changig the 
value to false
+#
+# zookeeper.sasl.disable: true
+
+# Override below configuration to provide custom ZK service name if configured
+#
+# zookeeper.sasl.service-name: zookeeper
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 8fb6af4..5ec39c2 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -582,10 +582,11 @@ public class MesosApplicationMasterRunner {
                // build the launch command
                boolean hasLogback = new File(workingDirectory, 
"logback.xml").exists();
                boolean hasLog4j = new File(workingDirectory, 
"log4j.properties").exists();
+               boolean hasKrb5 = false;
 
                String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
                        flinkConfig, tmParams.containeredParameters(), ".", ".",
-                       hasLogback, hasLog4j, taskManagerMainClass);
+                       hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
                cmd.setValue(launchCommand);
 
                // build the environment variables

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index c9748cb..d844f5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -308,6 +308,7 @@ public class BootstrapTools {
                        String logDirectory,
                        boolean hasLogback,
                        boolean hasLog4j,
+                       boolean hasKrb5,
                        Class<?> mainClass) {
 
                StringBuilder tmCommand = new 
StringBuilder("$JAVA_HOME/bin/java");
@@ -328,6 +329,12 @@ public class BootstrapTools {
                                tmCommand.append(" -Dlog4j.configuration=file:")
                                                
.append(configDirectory).append("/log4j.properties");
                        }
+
+                       //applicable only for YarnMiniCluster secure test run
+                       //krb5.conf file will be available as local resource in 
JM/TM container
+                       if(hasKrb5) {
+                               tmCommand.append(" 
-Djava.security.krb5.conf=krb5.conf");
+                       }
                }
 
                tmCommand.append(' ').append(mainClass.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
new file mode 100644
index 0000000..c4527dd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * JAAS configuration provider object that provides default LoginModule for 
various connectors that supports
+ * JAAS/SASL based Kerberos authentication. The implementation is inspired 
from Hadoop UGI class.
+ *
+ * Different connectors uses different login module name to implement JAAS 
based authentication support.
+ * For example, Kafka expects the login module name to be "kafkaClient" 
whereas ZooKeeper expect the
+ * name to be "client". This sets responsibility on the Flink cluster 
administrator to configure/provide right
+ * JAAS config entries. To simplify this requirement, we have introduced this 
abstraction that provides
+ * a standard lookup to get the login module entry for the JAAS based 
authentication to work.
+ *
+ * HDFS connector will not be impacted with this configuration since it uses 
UGI based mechanism to authenticate.
+ *
+ * <a 
href="https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html";>Configuration</a>
+ *
+ */
+
+@Internal
+public class JaasConfiguration extends Configuration {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JaasConfiguration.class);
+
+       public static final String JAVA_VENDOR_NAME = 
System.getProperty("java.vendor");
+
+       public static final boolean IBM_JAVA;
+
+       private static final Map<String, String> debugOptions = new HashMap<>();
+
+       private static final Map<String, String> kerberosCacheOptions = new 
HashMap<>();
+
+       private static final Map<String, String> keytabKerberosOptions = new 
HashMap<>();
+
+       private static final AppConfigurationEntry userKerberosAce;
+
+       private AppConfigurationEntry keytabKerberosAce = null;
+
+       static {
+
+               IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
+
+               if(LOG.isDebugEnabled()) {
+                       debugOptions.put("debug", "true");
+               }
+
+               if(IBM_JAVA) {
+                       kerberosCacheOptions.put("useDefaultCcache", "true");
+               } else {
+                       kerberosCacheOptions.put("doNotPrompt", "true");
+                       kerberosCacheOptions.put("useTicketCache", "true");
+               }
+
+               String ticketCache = System.getenv("KRB5CCNAME");
+               if(ticketCache != null) {
+                       if(IBM_JAVA) {
+                               System.setProperty("KRB5CCNAME", ticketCache);
+                       } else {
+                               kerberosCacheOptions.put("ticketCache", 
ticketCache);
+                       }
+               }
+
+               kerberosCacheOptions.put("renewTGT", "true");
+               kerberosCacheOptions.putAll(debugOptions);
+
+               userKerberosAce = new AppConfigurationEntry(
+                               KerberosUtil.getKrb5LoginModuleName(),
+                               
AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
+                               kerberosCacheOptions);
+
+       }
+
+       protected JaasConfiguration(String keytab, String principal) {
+
+               LOG.info("Initializing JAAS configuration instance. Parameters: 
{}, {}", keytab, principal);
+
+               if(StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal) ||
+                               (!StringUtils.isBlank(keytab) && 
StringUtils.isBlank(principal))){
+                       throw new RuntimeException("Both keytab and principal 
are required and cannot be empty");
+               }
+
+               if(!StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal)) {
+
+                       if(IBM_JAVA) {
+                               keytabKerberosOptions.put("useKeytab", 
prependFileUri(keytab));
+                               keytabKerberosOptions.put("credsType", "both");
+                       } else {
+                               keytabKerberosOptions.put("keyTab", keytab);
+                               keytabKerberosOptions.put("doNotPrompt", 
"true");
+                               keytabKerberosOptions.put("useKeyTab", "true");
+                               keytabKerberosOptions.put("storeKey", "true");
+                       }
+
+                       keytabKerberosOptions.put("principal", principal);
+                       keytabKerberosOptions.put("refreshKrb5Config", "true");
+                       keytabKerberosOptions.putAll(debugOptions);
+
+                       keytabKerberosAce = new AppConfigurationEntry(
+                                       KerberosUtil.getKrb5LoginModuleName(),
+                                       
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                       keytabKerberosOptions);
+               }
+       }
+
+       public static Map<String, String> getKeytabKerberosOptions() {
+               return keytabKerberosOptions;
+       }
+
+       private static String prependFileUri(String keytabPath) {
+               File f = new File(keytabPath);
+               return f.toURI().toString();
+       }
+
+       @Override
+       public AppConfigurationEntry[] getAppConfigurationEntry(String 
applicationName) {
+
+               LOG.debug("JAAS configuration requested for the application 
entry: {}", applicationName);
+
+               AppConfigurationEntry[] appConfigurationEntry;
+
+               if(keytabKerberosAce != null) {
+                       appConfigurationEntry = new AppConfigurationEntry[] 
{keytabKerberosAce, userKerberosAce};
+               } else {
+                       appConfigurationEntry = new AppConfigurationEntry[] 
{userKerberosAce};
+               }
+
+               return appConfigurationEntry;
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
new file mode 100644
index 0000000..4b8b69b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+
+/*
+ * Process-wide security context object which initializes UGI with appropriate 
security credentials and also it
+ * creates in-memory JAAS configuration object which will serve appropriate 
ApplicationConfigurationEntry for the
+ * connector login module implementation that authenticates Kerberos identity 
using SASL/JAAS based mechanism.
+ */
+@Internal
+public class SecurityContext {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SecurityContext.class);
+
+       public static final String JAAS_CONF_FILENAME = "flink-jaas.conf";
+
+       private static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
+
+       private static final String ZOOKEEPER_SASL_CLIENT = 
"zookeeper.sasl.client";
+
+       private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = 
"zookeeper.sasl.client.username";
+
+       private static SecurityContext installedContext;
+
+       public static SecurityContext getInstalled() { return installedContext; 
}
+
+       private UserGroupInformation ugi;
+
+       SecurityContext(UserGroupInformation ugi) {
+               if(ugi == null) {
+                       throw new RuntimeException("UGI passed cannot be null");
+               }
+               this.ugi = ugi;
+       }
+
+       public <T> T runSecured(final FlinkSecuredRunner<T> runner) throws 
Exception {
+               return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                       @Override
+                       public T run() throws Exception {
+                               return runner.run();
+                       }
+               });
+       }
+
+       public static void install(SecurityConfiguration config) throws 
Exception {
+
+               // perform static initialization of UGI, JAAS
+               if(installedContext != null) {
+                       LOG.warn("overriding previous security context");
+               }
+
+               // establish the JAAS config
+               JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
+               
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+
+               populateSystemSecurityProperties(config.flinkConf);
+
+               // establish the UGI login user
+               UserGroupInformation.setConfiguration(config.hadoopConf);
+
+               UserGroupInformation loginUser;
+
+               if(UserGroupInformation.isSecurityEnabled() &&
+                               config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
+                       String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
+
+                       
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
+
+                       loginUser = UserGroupInformation.getLoginUser();
+
+                       // supplement with any available tokens
+                       String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+                       if(fileLocation != null) {
+                               /*
+                                * Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
+                                * used in the context of reading the stored 
tokens from UGI.
+                                * Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+                                * loginUser.addCredentials(cred);
+                               */
+                               try {
+                                       Method readTokenStorageFileMethod = 
Credentials.class.getMethod("readTokenStorageFile",
+                                                       File.class, 
org.apache.hadoop.conf.Configuration.class);
+                                       Credentials cred = (Credentials) 
readTokenStorageFileMethod.invoke(null,new File(fileLocation),
+                                                       config.hadoopConf);
+                                       Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
+                                                       Credentials.class);
+                                       
addCredentialsMethod.invoke(loginUser,cred);
+                               } catch(NoSuchMethodException e) {
+                                       LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
+                               }
+                       }
+               } else {
+                       // login with current user credentials (e.g. ticket 
cache)
+                       try {
+                               //Use reflection API to get the login user 
object
+                               
//UserGroupInformation.loginUserFromSubject(null);
+                               Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+                               Subject subject = null;
+                               loginUserFromSubjectMethod.invoke(null,subject);
+                       } catch(NoSuchMethodException e) {
+                               LOG.warn("Could not find method implementations 
in the shaded jar. Exception: {}", e);
+                       }
+
+                       loginUser = UserGroupInformation.getLoginUser();
+                       // note that the stored tokens are read automatically
+               }
+
+               boolean delegationToken = false;
+               final Text HDFS_DELEGATION_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
+               Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();
+               for(Token<? extends TokenIdentifier> token : usrTok) {
+                       final Text id = new Text(token.getIdentifier());
+                       LOG.debug("Found user token " + id + " with " + token);
+                       if(token.getKind().equals(HDFS_DELEGATION_KIND)) {
+                               delegationToken = true;
+                       }
+               }
+
+               if(UserGroupInformation.isSecurityEnabled() && 
!loginUser.hasKerberosCredentials()) {
+                       //throw an error in non-yarn deployment if kerberos 
cache is not available
+                       if(!delegationToken) {
+                               LOG.error("Hadoop Security is enabled but 
current login user does not have Kerberos Credentials");
+                               throw new RuntimeException("Hadoop Security is 
enabled but current login user does not have Kerberos Credentials");
+                       }
+               }
+
+               installedContext = new SecurityContext(loginUser);
+       }
+
+       /*
+        * This method configures some of the system properties that are 
require for ZK and Kafka SASL authentication
+        * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+        * See: 
https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+        * In this method, setting java.security.auth.login.config 
configuration is configured only to support ZK and
+        * Kafka current code behavior.
+        */
+       private static void populateSystemSecurityProperties(Configuration 
configuration) {
+
+               //required to be empty for Kafka but we will override the 
property
+               //with pseudo JAAS configuration file if SASL auth is enabled 
for ZK
+               System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+
+               if(configuration == null) {
+                       return;
+               }
+
+               boolean disableSaslClient = 
configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
+                               ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+               if(disableSaslClient) {
+                       LOG.info("SASL client auth for ZK will be disabled");
+                       //SASL auth is disabled by default but will be enabled 
if specified in configuration
+                       System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
+                       return;
+               }
+
+               String baseDir = 
configuration.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
+               if(baseDir == null) {
+                       String message = "SASL auth is enabled for ZK but 
unable to locate pseudo Jaas config " +
+                                       "since " + 
ConfigConstants.FLINK_BASE_DIR_PATH_KEY + " is not provided";
+                       LOG.error(message);
+                       throw new IllegalConfigurationException(message);
+               }
+
+               File f = new File(baseDir);
+               if(!f.exists() || !f.isDirectory()) {
+                       LOG.error("Invalid flink base directory {} 
configuration provided", baseDir);
+                       throw new IllegalConfigurationException("Invalid flink 
base directory configuration provided");
+               }
+
+               File jaasConfigFile = new File(f, JAAS_CONF_FILENAME);
+
+               if (!jaasConfigFile.exists() || !jaasConfigFile.isFile()) {
+
+                       //check if there is a conf directory
+                       File confDir = new File(f, "conf");
+                       if(!confDir.exists() || !confDir.isDirectory()) {
+                               LOG.error("Could not locate " + 
JAAS_CONF_FILENAME);
+                               throw new IllegalConfigurationException("Could 
not locate " + JAAS_CONF_FILENAME);
+                       }
+
+                       jaasConfigFile = new File(confDir, JAAS_CONF_FILENAME);
+
+                       if (!jaasConfigFile.exists() || 
!jaasConfigFile.isFile()) {
+                               LOG.error("Could not locate " + 
JAAS_CONF_FILENAME);
+                               throw new IllegalConfigurationException("Could 
not locate " + JAAS_CONF_FILENAME);
+                       }
+               }
+
+               LOG.info("Enabling {} property with pseudo JAAS config file: 
{}",
+                               JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfigFile);
+
+               //ZK client module lookup the configuration to handle SASL.
+               
//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
+               System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfigFile.getAbsolutePath());
+               System.setProperty(ZOOKEEPER_SASL_CLIENT,"true");
+
+               String zkSaslServiceName = 
configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
+               if(!StringUtils.isBlank(zkSaslServiceName)) {
+                       LOG.info("ZK SASL service name: {} is provided in the 
configuration", zkSaslServiceName);
+                       
System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+               }
+
+       }
+
+       /**
+        * Inputs for establishing the security context.
+        */
+       public static class SecurityConfiguration {
+
+               Configuration flinkConf;
+
+               org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+
+               String keytab;
+
+               String principal;
+
+               public String getKeytab() {
+                       return keytab;
+               }
+
+               public String getPrincipal() {
+                       return principal;
+               }
+
+               public SecurityConfiguration 
setFlinkConfiguration(Configuration flinkConf) {
+
+                       this.flinkConf = flinkConf;
+
+                       String keytab = 
flinkConf.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+
+                       String principal = 
flinkConf.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+
+                       validate(keytab, principal);
+
+                       LOG.debug("keytab {} and principal {} .", keytab, 
principal);
+
+                       this.keytab = keytab;
+
+                       this.principal = principal;
+
+                       return this;
+               }
+
+               public SecurityConfiguration 
setHadoopConfiguration(org.apache.hadoop.conf.Configuration conf) {
+                       this.hadoopConf = conf;
+                       return this;
+               }
+
+               private void validate(String keytab, String principal) {
+
+                       if(StringUtils.isBlank(keytab) && 
!StringUtils.isBlank(principal) ||
+                                       !StringUtils.isBlank(keytab) && 
StringUtils.isBlank(principal)) {
+                               if(StringUtils.isBlank(keytab)) {
+                                       LOG.warn("Keytab is null or empty");
+                               }
+                               if(StringUtils.isBlank(principal)) {
+                                       LOG.warn("Principal is null or empty");
+                               }
+                               throw new RuntimeException("Requires both 
keytab and principal to be provided");
+                       }
+
+                       if(!StringUtils.isBlank(keytab)) {
+                               File keytabFile = new File(keytab);
+                               if(!keytabFile.exists() || 
!keytabFile.isFile()) {
+                                       LOG.warn("Not a valid keytab: {} file", 
keytab);
+                                       throw new RuntimeException("Invalid 
keytab file: " + keytab + " passed");
+                               }
+                       }
+
+               }
+       }
+
+       public interface FlinkSecuredRunner<T> {
+               T run() throws Exception;
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9c844ba..639c158 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -72,8 +72,8 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, 
NotifyKvStateRegistered, NotifyKvStateUnregistered}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
+import org.apache.flink.runtime.security.{SecurityContext}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -2062,26 +2062,18 @@ object JobManager {
     }
 
     // run the job manager
+    SecurityContext.install(new 
SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure JobManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            runJobManager(
-              configuration,
-              executionMode,
-              listeningHost,
-              listeningPortRange)
-          }
-        })
-      } else {
-        LOG.info("Security is not enabled. Starting non-authenticated 
JobManager.")
-        runJobManager(
-          configuration,
-          executionMode,
-          listeningHost,
-          listeningPortRange)
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          runJobManager(
+            configuration,
+            executionMode,
+            listeningHost,
+            listeningPortRange)
+        }
+      })
     } catch {
       case t: Throwable =>
         LOG.error("Failed to run JobManager.", t)

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 63a64a0..8534ee1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -71,8 +71,8 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
 import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, 
KvStateServer}
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
+import org.apache.flink.runtime.security.SecurityContext
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{MathUtils, NetUtils}
@@ -1521,19 +1521,14 @@ object TaskManager {
     val resourceId = ResourceID.generate()
 
     // run the TaskManager (if requested in an authentication enabled context)
+    SecurityContext.install(new 
SecurityConfiguration().setFlinkConfiguration(configuration))
+
     try {
-      if (SecurityUtils.isSecurityEnabled) {
-        LOG.info("Security is enabled. Starting secure TaskManager.")
-        SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
-          override def run(): Unit = {
-            selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
classOf[TaskManager])
-          }
-        })
-      }
-      else {
-        LOG.info("Security is not enabled. Starting non-authenticated 
TaskManager.")
-        selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
classOf[TaskManager])
-      }
+      SecurityContext.getInstalled.runSecured(new FlinkSecuredRunner[Unit] {
+        override def run(): Unit = {
+          selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
classOf[TaskManager])
+        }
+      })
     }
     catch {
       case t: Throwable =>
@@ -1588,6 +1583,8 @@ object TaskManager {
       }
     }
 
+    conf.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
cliConfig.getConfigDir() + "/..")
+
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
new file mode 100644
index 0000000..89e5ef9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JaasConfiguration}.
+ */
+public class JaasConfigurationTest {
+
+       @Test
+       public void testInvalidKerberosParams() {
+               String keytab = "user.keytab";
+               String principal = null;
+               try {
+                       new JaasConfiguration(keytab, principal);
+               } catch(RuntimeException re) {
+                       assertEquals("Both keytab and principal are required 
and cannot be empty",re.getMessage());
+               }
+       }
+
+       @Test
+       public void testDefaultAceEntry() {
+               JaasConfiguration conf = new JaasConfiguration(null,null);
+               javax.security.auth.login.Configuration.setConfiguration(conf);
+               final AppConfigurationEntry[] entry = 
conf.getAppConfigurationEntry("test");
+               AppConfigurationEntry ace = entry[0];
+               assertEquals(ace.getLoginModuleName(), 
KerberosUtil.getKrb5LoginModuleName());
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
new file mode 100644
index 0000000..5f3d76a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityContextTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link SecurityContext}.
+ */
+public class SecurityContextTest {
+
+       @Test
+       public void testCreateInsecureHadoopCtx() {
+               SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+               try {
+                       SecurityContext.install(sc);
+                       
assertEquals(UserGroupInformation.getLoginUser().getUserName(),getOSUserName());
+               } catch (Exception e) {
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testInvalidUGIContext() {
+               try {
+                       new SecurityContext(null);
+               } catch (RuntimeException re) {
+                       assertEquals("UGI passed cannot be 
null",re.getMessage());
+               }
+       }
+
+
+       private String getOSUserName() throws Exception {
+               String userName = "";
+               String osName = System.getProperty( "os.name" ).toLowerCase();
+               String className = null;
+
+               if( osName.contains( "windows" ) ){
+                       className = "com.sun.security.auth.module.NTSystem";
+               }
+               else if( osName.contains( "linux" ) ){
+                       className = "com.sun.security.auth.module.UnixSystem";
+               }
+               else if( osName.contains( "solaris" ) || osName.contains( 
"sunos" ) ){
+                       className = 
"com.sun.security.auth.module.SolarisSystem";
+               }
+
+               if( className != null ){
+                       Class<?> c = Class.forName( className );
+                       Method method = c.getDeclaredMethod( "getUsername" );
+                       Object o = c.newInstance();
+                       userName = (String) method.invoke( o );
+               }
+               return userName;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml 
b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
index 5712856..edf299d 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-streaming-connectors/flink-connector-filesystem/pom.xml
@@ -121,6 +121,42 @@ under the License.
                        
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+               </dependency>
+
        </dependencies>
 
+       <build>
+               <plugins>
+
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!--
+                                       Enforce single threaded execution to 
avoid port conflicts when running
+                                       secure mini DFS cluster
+                                       -->
+                                       <forkCount>1</forkCount>
+                                       <reuseForks>false</reuseForks>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 7ee75c1..c3c8df5 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,6 +58,8 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -79,19 +82,24 @@ import java.util.Map;
 @Deprecated
 public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
+       protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkITCase.class);
+
        @ClassRule
        public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-       private static MiniDFSCluster hdfsCluster;
-       private static org.apache.hadoop.fs.FileSystem dfs;
-       private static String hdfsURI;
+       protected static MiniDFSCluster hdfsCluster;
+       protected static org.apache.hadoop.fs.FileSystem dfs;
+       protected static String hdfsURI;
+       protected static Configuration conf = new Configuration();
 
+       protected static File dataDir;
 
        @BeforeClass
        public static void createHDFS() throws IOException {
-               Configuration conf = new Configuration();
 
-               File dataDir = tempFolder.newFolder();
+               LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
+
+               dataDir = tempFolder.newFolder();
 
                conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
                MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
@@ -106,6 +114,7 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
 
        @AfterClass
        public static void destroyHDFS() {
+               LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
                hdfsCluster.shutdown();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
new file mode 100644
index 0000000..86cedaf
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+
+/**
+ * Tests for running {@link RollingSinkSecuredITCase} which is an extension of 
{@link RollingSink} in secure environment
+ */
+
+//The test is disabled since MiniDFS secure run requires lower order ports to 
be used.
+//We can enable the test when the fix is available (HDFS-9213)
+@Ignore
+public class RollingSinkSecuredITCase extends RollingSinkITCase {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
+
+       /*
+        * override super class static methods to avoid creating MiniDFS and 
MiniFlink with wrong configurations
+        * and out-of-order sequence for secure cluster
+        */
+       @BeforeClass
+       public static void setup() throws Exception {}
+
+       @AfterClass
+       public static void teardown() throws Exception {}
+
+       @BeforeClass
+       public static void createHDFS() throws IOException {}
+
+       @AfterClass
+       public static void destroyHDFS() {}
+
+       @BeforeClass
+       public static void startSecureCluster() throws Exception {
+
+               LOG.info("starting secure cluster environment for testing");
+
+               dataDir = tempFolder.newFolder();
+
+               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
+
+               SecureTestEnvironment.prepare(tempFolder);
+
+               populateSecureConfigurations();
+
+               Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+                               SecureTestEnvironment.getTestKeytab());
+               flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+                               
SecureTestEnvironment.getHadoopServicePrincipal());
+
+               SecurityContext.SecurityConfiguration ctx = new 
SecurityContext.SecurityConfiguration();
+               ctx.setFlinkConfiguration(flinkConfig);
+               ctx.setHadoopConfiguration(conf);
+               try {
+                       TestingSecurityContext.install(ctx, 
SecureTestEnvironment.getClientSecurityConfigurationMap());
+               } catch (Exception e) {
+                       throw new RuntimeException("Exception occurred while 
setting up secure test context. Reason: {}", e);
+               }
+
+               File hdfsSiteXML = new File(dataDir.getAbsolutePath() + 
"/hdfs-site.xml");
+
+               FileWriter writer = new FileWriter(hdfsSiteXML);
+               conf.writeXml(writer);
+               writer.flush();
+               writer.close();
+
+               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+               map.put("HADOOP_CONF_DIR", 
hdfsSiteXML.getParentFile().getAbsolutePath());
+               TestBaseUtils.setEnv(map);
+
+
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
+               builder.checkDataNodeAddrConfig(true);
+               builder.checkDataNodeHostConfig(true);
+               hdfsCluster = builder.build();
+
+               dfs = hdfsCluster.getFileSystem();
+
+               hdfsURI = "hdfs://"
+                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
+                               + "/";
+
+               startSecureFlinkClusterWithRecoveryModeEnabled();
+       }
+
+       @AfterClass
+       public static void teardownSecureCluster() throws Exception {
+               LOG.info("tearing down secure cluster environment");
+
+               TestStreamEnvironment.unsetAsContext();
+               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+
+               hdfsCluster.shutdown();
+
+               SecureTestEnvironment.cleanup();
+       }
+
+       private static void populateSecureConfigurations() {
+
+               String dataTransferProtection = "authentication";
+
+               
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
 conf);
+               conf.set(DFS_NAMENODE_USER_NAME_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
+               conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, 
SecureTestEnvironment.getTestKeytab());
+               conf.set(DFS_DATANODE_USER_NAME_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
+               conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, 
SecureTestEnvironment.getTestKeytab());
+               conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
+
+               conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+               conf.set("dfs.data.transfer.protection", 
dataTransferProtection);
+
+               conf.set(DFS_HTTP_POLICY_KEY, 
HttpConfig.Policy.HTTP_ONLY.name());
+
+               conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
+
+               conf.setInt("dfs.datanode.socket.write.timeout", 0);
+
+               /*
+                * We ae setting the port number to privileged port - see 
HDFS-9213
+                * This requires the user to have root privilege to bind to the 
port
+                * Use below command (ubuntu) to set privilege to java process 
for the
+                * bind() to work if the java process is not running as root.
+                * setcap 'cap_net_bind_service=+ep' /path/to/java
+                */
+               conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
+               conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
+               conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
+       }
+
+       private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
+               try {
+                       LOG.info("Starting Flink and ZK in secure mode");
+
+                       dfs.mkdirs(new Path("/flink/checkpoints"));
+                       dfs.mkdirs(new Path("/flink/recovery"));
+
+                       org.apache.flink.configuration.Configuration config = 
new org.apache.flink.configuration.Configuration();
+
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
+                       
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
+                       config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
+                       
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + 
"/flink/checkpoints");
+                       
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + 
"/flink/recovery");
+                       config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
+
+                       
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
+
+                       cluster = TestBaseUtils.startCluster(config, false);
+                       TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
+
+               } catch (Exception e) {
+                       LOG.error("Exception occured while creating MiniFlink 
cluster. Reason: {}", e);
+                       throw new RuntimeException(e);
+               }
+       }
+
+       /* For secure cluster testing, it is enough to run only one test and 
override below test methods
+        * to keep the overall build time minimal
+        */
+       @Override
+       public void testNonRollingSequenceFileWithoutCompressionWriter() throws 
Exception {}
+
+       @Override
+       public void testNonRollingSequenceFileWithCompressionWriter() throws 
Exception {}
+
+       @Override
+       public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws 
Exception {}
+
+       @Override
+       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {}
+
+       @Override
+       public void testDateTimeRollingStringWriter() throws Exception {}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
index fe60d94..5c22851 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -25,3 +25,5 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] 
%-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index fc13719..5c951db 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 864773a..cbf3d06 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -81,6 +81,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public Properties getSecureProperties() {
+               return null;
+       }
+
+       @Override
        public String getVersion() {
                return "0.8";
        }
@@ -132,9 +137,14 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                return server.socketServer().brokerId();
        }
 
+       @Override
+       public boolean isSecureRunSupported() {
+               return false;
+       }
+
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties) {
+       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
                this.additionalServerProperties = additionalServerProperties;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
@@ -210,6 +220,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                if (zookeeper != null) {
                        try {
                                zookeeper.stop();
+                               zookeeper.close();
                        }
                        catch (Exception e) {
                                LOG.warn("ZK.stop() failed", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index 3b31de6..bfcde82 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -134,6 +134,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>
@@ -180,6 +187,17 @@ under the License.
                                        <argLine>-Xms256m -Xmx1000m 
-Dlog4j.configuration=${log4j.configuration} 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
                                </configuration>
                        </plugin>
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
                </plugins>
        </build>
        

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 957833d..16ddcdc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -21,12 +21,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
+import java.util.Properties;
 import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
        // 
------------------------------------------------------------------------
@@ -131,11 +131,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
        public void testJsonTableSource() throws Exception {
                String topic = UUID.randomUUID().toString();
 
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
                // Names and types are determined in the actual test method of 
the
                // base test class.
                Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
                                topic,
-                               standardProps,
+                               props,
                                new String[] {
                                                "long",
                                                "string",
@@ -159,11 +163,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
        public void testJsonTableSourceWithFailOnMissingField() throws 
Exception {
                String topic = UUID.randomUUID().toString();
 
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
                // Names and types are determined in the actual test method of 
the
                // base test class.
                Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
                                topic,
-                               standardProps,
+                               props,
                                new String[] {
                                                "long",
                                                "string",

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 1288347..ae4f5b2 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
-
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
new file mode 100644
index 0000000..d12ec65
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecureRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecureRunITCase extends KafkaConsumerTestBase {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecureRunITCase.class);
+
+       @BeforeClass
+       public static void prepare() throws IOException, ClassNotFoundException 
{
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting Kafka09SecureRunITCase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               SecureTestEnvironment.prepare(tempFolder);
+               
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+               startClusters(true);
+       }
+
+       @AfterClass
+       public static void shutDownServices() {
+               shutdownClusters();
+               SecureTestEnvironment.cleanup();
+       }
+
+
+       //timeout interval is large since in Travis, ZK connection timeout 
occurs frequently
+       //The timeout for the test case is 2 times timeout of ZK connection
+       @Test(timeout = 600000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+}
\ No newline at end of file

Reply via email to