http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0dbe865..213ba4a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -65,6 +65,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String brokerConnectionString = "";
        private Properties standardProps;
        private Properties additionalServerProperties;
+       private boolean secureMode = false;
+       // 6 seconds is default. Seems to be too small for travis. 30 seconds
+       private String zkTimeout = "30000";
 
        public String getBrokerConnectionString() {
                return brokerConnectionString;
@@ -131,8 +134,22 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties) {
+       public boolean isSecureRunSupported() {
+               return true;
+       }
+
+       @Override
+       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
+
+               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
+               if(secureMode) {
+                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
+                       numKafkaServers = 1;
+                       zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) 
* 15);
+               }
+
                this.additionalServerProperties = additionalServerProperties;
+               this.secureMode = secureMode;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
                tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
@@ -155,6 +172,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("Starting Zookeeper");
                        zookeeper = new TestingServer(-1, tmpZkDir);
                        zookeeperConnectionString = 
zookeeper.getConnectString();
+                       LOG.info("zookeeperConnectionString: {}", 
zookeeperConnectionString);
 
                        LOG.info("Starting KafkaServer");
                        brokers = new ArrayList<>(numKafkaServers);
@@ -163,7 +181,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
 
                                SocketServer socketServer = 
brokers.get(i).socketServer();
-                               brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                               if(secureMode) {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+                               } else {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                               }
                        }
 
                        LOG.info("ZK and KafkaServer started.");
@@ -173,15 +195,18 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        fail("Test setup failed: " + t.getMessage());
                }
 
+               LOG.info("brokerConnectionString --> {}", 
brokerConnectionString);
+
                standardProps = new Properties();
                standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
                standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
                standardProps.setProperty("group.id", "flink-tests");
                standardProps.setProperty("auto.commit.enable", "false");
-               standardProps.setProperty("zookeeper.session.timeout.ms", 
"30000"); // 6 seconds is default. Seems to be too small for travis.
-               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"30000");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
                standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
                standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
        }
 
        @Override
@@ -196,6 +221,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                if (zookeeper != null) {
                        try {
                                zookeeper.stop();
+                               zookeeper.close();
                        }
                        catch (Exception e) {
                                LOG.warn("ZK.stop() failed", e);
@@ -224,6 +250,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        public ZkUtils getZkUtils() {
+               LOG.info("In getZKUtils:: zookeeperConnectionString = {}", 
zookeeperConnectionString);
                ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
                                
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
                return ZkUtils.apply(creator, false);
@@ -241,23 +268,37 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        zkUtils.close();
                }
 
+               LOG.info("Topic {} create request is successfully posted", 
topic);
+
                // validate that the topic has been created
-               final long deadline = System.currentTimeMillis() + 30000;
+               final long deadline = System.currentTimeMillis() + 
Integer.parseInt(zkTimeout);
                do {
                        try {
-                               Thread.sleep(100);
+                               if(secureMode) {
+                                       //increase wait time since in Travis ZK 
timeout occurs frequently
+                                       int wait = Integer.parseInt(zkTimeout) 
/ 100;
+                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
+                                       Thread.sleep(wait);
+                               } else {
+                                       Thread.sleep(100);
+                               }
+
                        } catch (InterruptedException e) {
                                // restore interrupted state
                        }
                        // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
                        // not always correct.
 
+                       LOG.info("Validating if the topic {} has been created 
or not", topic);
+
                        // create a new ZK utils connection
                        ZkUtils checkZKConn = getZkUtils();
                        if(AdminUtils.topicExists(checkZKConn, topic)) {
+                               LOG.info("topic {} has been created 
successfully", topic);
                                checkZKConn.close();
                                return;
                        }
+                       LOG.info("topic {} has not been created yet. Will check 
again...", topic);
                        checkZKConn.close();
                }
                while (System.currentTimeMillis() < deadline);
@@ -296,8 +337,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
 
                // for CI stability, increase zookeeper session timeout
-               kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
-               kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
+               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
                if(additionalServerProperties != null) {
                        kafkaProperties.putAll(additionalServerProperties);
                }
@@ -307,6 +348,15 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                for (int i = 1; i <= numTries; i++) {
                        int kafkaPort = NetUtils.getAvailablePort();
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       //to support secure kafka cluster
+                       if(secureMode) {
+                               LOG.info("Adding Kafka secure configurations");
+                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.putAll(getSecureProperties());
+                       }
+
                        KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
 
                        try {
@@ -329,4 +379,19 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
        }
 
+       public Properties getSecureProperties() {
+               Properties prop = new Properties();
+               if(secureMode) {
+                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
+                       prop.put("security.protocol", "SASL_PLAINTEXT");
+                       prop.put("sasl.kerberos.service.name", "kafka");
+
+                       //add special timeout for Travis
+                       prop.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
+                       prop.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
+                       prop.setProperty("metadata.fetch.timeout.ms","120000");
+               }
+               return prop;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index fbeb110..4ac1773 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -28,3 +28,5 @@ 
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index 49d630f..ef71bde 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -161,6 +161,14 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <dependencyManagement>
@@ -187,6 +195,17 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
                </plugins>
        </build>
        

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 920f15b..a87ff8a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -181,6 +181,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        properties.setProperty("session.timeout.ms", "2000");
                        properties.setProperty("fetch.max.wait.ms", "2000");
                        properties.setProperty("heartbeat.interval.ms", "1000");
+                       properties.putAll(secureProps);
                        FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
                        DataStream<String> stream = see.addSource(source);
                        stream.print();
@@ -275,6 +276,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                });
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
+               producerProperties.putAll(secureProps);
                FlinkKafkaProducerBase<Tuple2<Long, String>> prod = 
kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
                stream.addSink(prod);
 
@@ -283,7 +285,11 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                List<String> topics = new ArrayList<>();
                topics.add(topic);
                topics.add(additionalEmptyTopic);
-               FlinkKafkaConsumerBase<Tuple2<Long, String>> source = 
kafkaServer.getConsumer(topics, sourceSchema, standardProps);
+
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<Tuple2<Long, String>> source = 
kafkaServer.getConsumer(topics, sourceSchema, props);
 
                DataStreamSource<Tuple2<Long, String>> consuming = 
env.addSource(source).setParallelism(parallelism);
 
@@ -371,7 +377,11 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0));
                env.getConfig().disableSysoutLogging();
 
-               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
+               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, props);
 
                env
                                .addSource(kafkaSource)
@@ -416,7 +426,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0));
                env.getConfig().disableSysoutLogging();
 
-               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, props);
 
                env
                                .addSource(kafkaSource)
@@ -463,7 +476,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.getConfig().disableSysoutLogging();
                env.setBufferTimeout(0);
 
-               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, props);
 
                env
                        .addSource(kafkaSource)
@@ -506,7 +522,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                        env.enableCheckpointing(100);
                                        env.getConfig().disableSysoutLogging();
 
-                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+                                       Properties props = new Properties();
+                                       props.putAll(standardProps);
+                                       props.putAll(secureProps);
+                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
                                        env.addSource(source).addSink(new 
DiscardingSink<String>());
 
@@ -577,7 +596,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                        env.enableCheckpointing(100);
                                        env.getConfig().disableSysoutLogging();
 
-                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), standardProps);
+                                       Properties props = new Properties();
+                                       props.putAll(standardProps);
+                                       props.putAll(secureProps);
+                                       FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
 
                                        env.addSource(source).addSink(new 
DiscardingSink<String>());
 
@@ -629,7 +651,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.setParallelism(12); // needs to be more that the mini 
cluster has slots
                env.getConfig().disableSysoutLogging();
 
-               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, props);
 
                env
                                .addSource(kafkaSource)
@@ -700,15 +725,19 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                Tuple2WithTopicSchema schema = new 
Tuple2WithTopicSchema(env.getConfig());
 
-               stream.addSink(kafkaServer.getProducer("dummy", schema, 
standardProps, null));
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
+               stream.addSink(kafkaServer.getProducer("dummy", schema, props, 
null));
 
                env.execute("Write to topics");
 
                // run second job consuming from multiple topics
                env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env.getConfig().disableSysoutLogging();
-               
-               stream = env.addSource(kafkaServer.getConsumer(topics, schema, 
standardProps));
+
+               stream = env.addSource(kafkaServer.getConsumer(topics, schema, 
props));
 
                stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, 
String>, Integer>() {
                        Map<String, Integer> countPerTopic = new 
HashMap<>(NUM_TOPICS);
@@ -787,6 +816,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                // Produce serialized JSON data
                createTestTopic(topic, 1, 1);
 
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
                StreamExecutionEnvironment env = StreamExecutionEnvironment
                                .createRemoteEnvironment("localhost", 
flinkPort);
                env.getConfig().disableSysoutLogging();
@@ -805,7 +838,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }).addSink(kafkaServer.getProducer(
                                topic,
                                new ByteArraySerializationSchema(),
-                               standardProps,
+                               props,
                                null));
 
                // Execute blocks
@@ -940,6 +973,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                consumerProps.setProperty("fetch.message.max.bytes", 
Integer.toString(1024 * 1024 * 14));
                consumerProps.setProperty("max.partition.fetch.bytes", 
Integer.toString(1024 * 1024 * 14)); // for the new fetcher
                consumerProps.setProperty("queued.max.message.chunks", "1");
+               consumerProps.putAll(secureProps);
 
                FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = 
kafkaServer.getConsumer(topic, serSchema, consumerProps);
                DataStreamSource<Tuple2<Long, byte[]>> consuming = 
env.addSource(source);
@@ -969,6 +1003,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                Properties producerProps = new Properties();
                producerProps.setProperty("max.request.size", 
Integer.toString(1024 * 1024 * 15));
                producerProps.setProperty("retries", "3");
+               producerProps.putAll(secureProps);
                
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerConnectionStrings);
 
                DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new 
RichSourceFunction<Tuple2<Long, byte[]>>() {
@@ -1047,8 +1082,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
 
-
-               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, standardProps);
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               FlinkKafkaConsumerBase<Integer> kafkaSource = 
kafkaServer.getConsumer(topic, schema, props);
 
                env
                                .addSource(kafkaSource)
@@ -1097,6 +1134,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new 
TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, 
env.getConfig());
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
+               producerProperties.putAll(secureProps);
                kvStream.addSink(kafkaServer.getProducer(topic, schema, 
producerProperties, null));
                env.execute("Write KV to Kafka");
 
@@ -1110,7 +1148,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema 
= new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, 
env.getConfig());
 
-               DataStream<Tuple2<Long, PojoValue>> fromKafka = 
env.addSource(kafkaServer.getConsumer(topic, readSchema, standardProps));
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               DataStream<Tuple2<Long, PojoValue>> fromKafka = 
env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
                fromKafka.flatMap(new 
RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
                        long counter = 0;
                        @Override
@@ -1178,6 +1219,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
+               producerProperties.putAll(secureProps);
+
                kvStream.addSink(kafkaServer.getProducer(topic, schema, 
producerProperties, null));
 
                env.execute("Write deletes to Kafka");
@@ -1189,7 +1232,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env.getConfig().disableSysoutLogging();
 
-               DataStream<Tuple2<byte[], PojoValue>> fromKafka = 
env.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               DataStream<Tuple2<byte[], PojoValue>> fromKafka = 
env.addSource(kafkaServer.getConsumer(topic, schema, props));
 
                fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], 
PojoValue>, Object>() {
                        long counter = 0;
@@ -1226,7 +1272,11 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env1.getConfig().disableSysoutLogging();
 
-               DataStream<Tuple2<Integer, Integer>> fromKafka = 
env1.addSource(kafkaServer.getConsumer(topic, new 
FixedNumberDeserializationSchema(ELEMENT_COUNT), standardProps));
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
+               DataStream<Tuple2<Integer, Integer>> fromKafka = 
env1.addSource(kafkaServer.getConsumer(topic, new 
FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
                fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, 
Void>() {
                        @Override
                        public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Void> out) throws Exception {
@@ -1262,8 +1312,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                        env1.getConfig().disableSysoutLogging();
                                        env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
 
+                                       Properties props = new Properties();
+                                       props.putAll(standardProps);
+                                       props.putAll(secureProps);
+
                                        
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-                                       DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
+                                       DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, props));
                                        fromKafka.flatMap(new 
FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
                                                @Override
                                                public void 
flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception 
{// no op
@@ -1288,7 +1342,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                                }
                                        });
 
-                                       
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+                                       
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), props, null));
 
                                        env1.execute("Metrics test job");
                                } catch(Throwable t) {
@@ -1403,6 +1457,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                final TypeInformationSerializationSchema<Tuple2<Integer, 
Integer>> deser =
                                new 
TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
 
+               cc.putAll(secureProps);
                // create the consumer
                FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = 
kafkaServer.getConsumer(topicName, deser, cc);
 
@@ -1505,6 +1560,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        // the producer must not produce duplicates
                        Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                        producerProperties.setProperty("retries", "0");
+                       producerProperties.putAll(secureProps);
                        
                        stream.addSink(kafkaServer.getProducer(
                                                        topicName, serSchema, 
producerProperties,
@@ -1537,7 +1593,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        
                        Properties readProps = (Properties) 
standardProps.clone();
                        readProps.setProperty("group.id", 
"flink-tests-validator");
-                       
+                       readProps.putAll(secureProps);
                        FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> 
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
 
                        readEnv
@@ -1672,6 +1728,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                newProps.setProperty("group.id", "topic-printer"+ 
UUID.randomUUID().toString());
                newProps.setProperty("auto.offset.reset", "smallest");
                newProps.setProperty("zookeeper.connect", 
standardProps.getProperty("zookeeper.connect"));
+               newProps.putAll(secureProps);
 
                ConsumerConfig printerConfig = new ConsumerConfig(newProps);
                printTopic(topicName, printerConfig, deserializer, elements);
@@ -1893,8 +1950,11 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                TypeInformationSerializationSchema<Tuple2<Long, Integer>> 
sourceSchema =
                        new TypeInformationSerializationSchema<>(inputTypeInfo, 
env.getConfig());
 
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
                FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = 
kafkaServer
-                       .getConsumer(topics, sourceSchema, standardProps)
+                       .getConsumer(topics, sourceSchema, props)
                        .assignTimestampsAndWatermarks(new 
TestPunctuatedTSExtractor());
 
                DataStreamSource<Tuple2<Long, Integer>> consuming = 
env.setParallelism(1).addSource(source);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 14e74f1..5bcf406 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.test.util.SuccessException;
 
 
 import java.io.Serializable;
+import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -102,17 +103,24 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                                }
                        })
                        .setParallelism(1);
+
+                       Properties props = new Properties();
+                       
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+                       props.putAll(secureProps);
                        
                        // sink partitions into 
                        stream.addSink(kafkaServer.getProducer(topic,
                                        new 
KeyedSerializationSchemaWrapper<>(serSchema),
-                                       
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings),
+                                       props,
                                        new CustomPartitioner(parallelism)))
                        .setParallelism(parallelism);
 
                        // ------ consuming topology ---------
-                       
-                       FlinkKafkaConsumerBase<Tuple2<Long, String>> source = 
kafkaServer.getConsumer(topic, deserSchema, standardProps);
+
+                       Properties consumerProps = new Properties();
+                       consumerProps.putAll(standardProps);
+                       consumerProps.putAll(secureProps);
+                       FlinkKafkaConsumerBase<Tuple2<Long, String>> source = 
kafkaServer.getConsumer(topic, deserSchema, consumerProps);
                        
                        env.addSource(source).setParallelism(parallelism)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index c4949ff..9236e78 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,29 +62,39 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
        private static Properties standardProps;
        private static LocalFlinkMiniCluster flink;
 
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       protected static Properties secureProps = new Properties();
+
        @BeforeClass
        public static void prepare() throws IOException, ClassNotFoundException 
{
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting KafkaShortRetentionTestBase ");
                
LOG.info("-------------------------------------------------------------------------");
 
+               Configuration flinkConfig = new Configuration();
+
                // dynamically load the implementation for the test
                Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
                kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
 
                LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
 
+               if(kafkaServer.isSecureRunSupported()) {
+                       secureProps = kafkaServer.getSecureProperties();
+               }
+
                Properties specificProperties = new Properties();
                specificProperties.setProperty("log.retention.hours", "0");
                specificProperties.setProperty("log.retention.minutes", "0");
                specificProperties.setProperty("log.retention.ms", "250");
                
specificProperties.setProperty("log.retention.check.interval.ms", "100");
-               kafkaServer.prepare(1, specificProperties);
+               kafkaServer.prepare(1, specificProperties, false);
 
                standardProps = kafkaServer.getStandardProperties();
 
                // start also a re-usable Flink mini cluster
-               Configuration flinkConfig = new Configuration();
                
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
@@ -98,6 +110,8 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                        flink.shutdown();
                }
                kafkaServer.shutdown();
+
+               secureProps.clear();
        }
 
        /**
@@ -151,12 +165,17 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                                running = false;
                        }
                });
-               stream.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), standardProps, 
null));
+
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+
+               stream.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null));
 
                // ----------- add consumer dataflow ----------
 
                NonContinousOffsetsDeserializationSchema deserSchema = new 
NonContinousOffsetsDeserializationSchema();
-               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, deserSchema, standardProps);
+               FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, deserSchema, props);
 
                DataStreamSource<String> consuming = env.addSource(source);
                consuming.addSink(new DiscardingSink<String>());
@@ -224,6 +243,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
 
                Properties customProps = new Properties();
                customProps.putAll(standardProps);
+               customProps.putAll(secureProps);
                customProps.setProperty("auto.offset.reset", "none"); // test 
that "none" leads to an exception
                FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
 
@@ -255,6 +275,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
 
                Properties customProps = new Properties();
                customProps.putAll(standardProps);
+               customProps.putAll(secureProps);
                customProps.setProperty("auto.offset.reset", "none"); // test 
that "none" leads to an exception
                
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 771db17..afdd158 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,61 +75,90 @@ public abstract class KafkaTestBase extends TestLogger {
 
        protected static KafkaTestEnvironment kafkaServer;
 
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       protected static Properties secureProps = new Properties();
+
        // 
------------------------------------------------------------------------
        //  Setup and teardown of the mini clusters
        // 
------------------------------------------------------------------------
        
        @BeforeClass
        public static void prepare() throws IOException, ClassNotFoundException 
{
+
                
LOG.info("-------------------------------------------------------------------------");
                LOG.info("    Starting KafkaTestBase ");
                
LOG.info("-------------------------------------------------------------------------");
-               
 
+               startClusters(false);
 
-               // dynamically load the implementation for the test
-               Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
-               kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
+       }
 
-               LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
+       @AfterClass
+       public static void shutDownServices() {
 
-               kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Shut down KafkaTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
 
-               standardProps = kafkaServer.getStandardProperties();
-               brokerConnectionStrings = 
kafkaServer.getBrokerConnectionString();
+               shutdownClusters();
 
-               // start also a re-usable Flink mini cluster
-               Configuration flinkConfig = new Configuration();
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    KafkaTestBase finished");
+               
LOG.info("-------------------------------------------------------------------------");
+       }
+
+       protected static Configuration getFlinkConfiguration() {
+               Configuration flinkConfig = new Configuration();;
                
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
+               return flinkConfig;
+       }
+
+       protected static void startClusters(boolean secureMode) throws 
ClassNotFoundException {
+
+               // dynamically load the implementation for the test
+               Class<?> clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
+               kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
+
+               LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
+
+               kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+
+               standardProps = kafkaServer.getStandardProperties();
 
-               flink = new LocalFlinkMiniCluster(flinkConfig, false);
+               brokerConnectionStrings = 
kafkaServer.getBrokerConnectionString();
+
+               if(kafkaServer.isSecureRunSupported() && secureMode) {
+                       secureProps = kafkaServer.getSecureProperties();
+               }
+
+               // start also a re-usable Flink mini cluster
+               flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), 
false);
                flink.start();
 
                flinkPort = flink.getLeaderRPCPort();
-       }
 
-       @AfterClass
-       public static void shutDownServices() {
+       }
 
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Shut down KafkaTestBase ");
-               
LOG.info("-------------------------------------------------------------------------");
+       protected static void shutdownClusters() {
 
                flinkPort = -1;
                if (flink != null) {
                        flink.shutdown();
                }
 
+               if(secureProps != null) {
+                       secureProps.clear();
+               }
+
                kafkaServer.shutdown();
 
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    KafkaTestBase finished");
-               
LOG.info("-------------------------------------------------------------------------");
        }
 
 
@@ -164,4 +195,5 @@ public abstract class KafkaTestBase extends TestLogger {
        protected static void deleteTestTopic(String topic) {
                kafkaServer.deleteTestTopic(topic);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 0b1d51d..6ecde71 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -35,10 +35,10 @@ public abstract class KafkaTestEnvironment {
 
        protected static final String KAFKA_HOST = "localhost";
 
-       public abstract void prepare(int numKafkaServers, Properties 
kafkaServerProperties);
+       public abstract void prepare(int numKafkaServers, Properties 
kafkaServerProperties, boolean secureMode);
 
-       public void prepare(int numberOfKafkaServers) {
-               this.prepare(numberOfKafkaServers, null);
+       public void prepare(int numberOfKafkaServers, boolean secureMode) {
+               this.prepare(numberOfKafkaServers, null, secureMode);
        }
 
        public abstract void shutdown();
@@ -51,9 +51,10 @@ public abstract class KafkaTestEnvironment {
                this.createTestTopic(topic, numberOfPartitions, 
replicationFactor, new Properties());
        }
 
-
        public abstract Properties getStandardProperties();
 
+       public abstract Properties getSecureProperties();
+
        public abstract String getBrokerConnectionString();
 
        public abstract String getVersion();
@@ -86,4 +87,6 @@ public abstract class KafkaTestEnvironment {
 
        public abstract int getBrokerId(KafkaServer server);
 
+       public abstract boolean isSecureRunSupported();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 5a38e56..58a5cc3 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -135,11 +135,18 @@ public class DataGenerators {
                                        }
                                });
 
+               Properties props = new Properties();
+               
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
+               Properties secureProps = testServer.getSecureProperties();
+               if(secureProps != null) {
+                       props.putAll(testServer.getSecureProperties());
+               }
+
                stream
                                .rebalance()
                                .addSink(testServer.getProducer(topic,
                                                new 
KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
-                                               
FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
+                                               props,
                                                new KafkaPartitioner<Integer>() 
{
                                                        @Override
                                                        public int 
partition(Integer next, byte[] serializedKey, byte[] serializedValue, int 
numPartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index 18ecfde..5c99ef6 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -78,5 +78,30 @@ under the License.
                        <scope>compile</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+               </dependency>
+
        </dependencies>
+
+       <build>
+               <plugins>
+
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
+
+               </plugins>
+       </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a478908..6ec6c2c 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for streaming unit tests that run multiple tests and want to 
reuse the same
@@ -67,18 +69,22 @@ public class StreamingMultipleProgramsTestBase extends 
AbstractTestBase {
                super(new Configuration());
        }
 
+       protected static final Logger LOG = 
LoggerFactory.getLogger(StreamingMultipleProgramsTestBase.class);
+
        // 
------------------------------------------------------------------------
        //  Cluster setup & teardown
        // 
------------------------------------------------------------------------
 
        @BeforeClass
        public static void setup() throws Exception {
+               LOG.info("In StreamingMultipleProgramsTestBase: Starting 
FlinkMiniCluster ");
                cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, 
false, false, true);
                TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
        }
 
        @AfterClass
        public static void teardown() throws Exception {
+               LOG.info("In StreamingMultipleProgramsTestBase: Closing 
FlinkMiniCluster ");
                TestStreamEnvironment.unsetAsContext();
                stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
new file mode 100644
index 0000000..00b19f1
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper {@link SecureTestEnvironment} to handle MiniKDC lifecycle.
+ * This class can be used to start/stop MiniKDC and create secure 
configurations for MiniDFSCluster
+ * and MiniYarn
+ */
+
+public class SecureTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(SecureTestEnvironment.class);
+
+       private static MiniKdc kdc;
+
+       private static String testKeytab = null;
+
+       private static String testPrincipal = null;
+
+       private static String testZkServerPrincipal = null;
+
+       private static String testZkClientPrincipal = null;
+
+       private static String testKafkaServerPrincipal = null;
+
+       private static String hadoopServicePrincipal = null;
+
+       private static File baseDirForSecureRun = null;
+
+       public static void prepare(TemporaryFolder tempFolder) {
+
+               try {
+                       baseDirForSecureRun = tempFolder.newFolder();
+                       LOG.info("Base Directory for Secure Environment: {}", 
baseDirForSecureRun);
+
+                       String hostName = "localhost";
+                       Properties kdcConf = MiniKdc.createConf();
+                       if(LOG.isDebugEnabled()) {
+                               kdcConf.setProperty(MiniKdc.DEBUG, "true");
+                       }
+                       kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName);
+                       kdc = new MiniKdc(kdcConf, baseDirForSecureRun);
+                       kdc.start();
+                       LOG.info("Started Mini KDC");
+
+                       File keytabFile = new File(baseDirForSecureRun, 
"test-users.keytab");
+                       testKeytab = keytabFile.getAbsolutePath();
+                       testZkServerPrincipal = "zookeeper/127.0.0.1";
+                       testZkClientPrincipal = "zk-client/127.0.0.1";
+                       testKafkaServerPrincipal = "kafka/" + hostName;
+                       hadoopServicePrincipal = "hadoop/" + hostName;
+                       testPrincipal = "client/" + hostName;
+
+                       kdc.createPrincipal(keytabFile, testPrincipal, 
testZkServerPrincipal,
+                                       hadoopServicePrincipal,
+                                       testZkClientPrincipal,
+                                       testKafkaServerPrincipal);
+
+                       testPrincipal = testPrincipal + "@" + kdc.getRealm();
+                       testZkServerPrincipal = testZkServerPrincipal + "@" + 
kdc.getRealm();
+                       testZkClientPrincipal = testZkClientPrincipal + "@" + 
kdc.getRealm();
+                       testKafkaServerPrincipal = testKafkaServerPrincipal + 
"@" + kdc.getRealm();
+                       hadoopServicePrincipal = hadoopServicePrincipal + "@" + 
kdc.getRealm();
+
+                       
LOG.info("-------------------------------------------------------------------");
+                       LOG.info("Test Principal: {}", testPrincipal);
+                       LOG.info("Test ZK Server Principal: {}", 
testZkServerPrincipal);
+                       LOG.info("Test ZK Client Principal: {}", 
testZkClientPrincipal);
+                       LOG.info("Test Kafka Server Principal: {}", 
testKafkaServerPrincipal);
+                       LOG.info("Test Hadoop Service Principal: {}", 
hadoopServicePrincipal);
+                       LOG.info("Test Keytab: {}", testKeytab);
+                       
LOG.info("-------------------------------------------------------------------");
+
+                       //Security Context is established to allow non hadoop 
applications that requires JAAS
+                       //based SASL/Kerberos authentication to work. However, 
for Hadoop specific applications
+                       //the context can be reinitialized with Hadoop 
configuration by calling
+                       //ctx.setHadoopConfiguration() for the UGI 
implementation to work properly.
+                       //See Yarn test case module for reference
+                       createJaasConfig(baseDirForSecureRun);
+                       SecurityContext.SecurityConfiguration ctx = new 
SecurityContext.SecurityConfiguration();
+                       Configuration flinkConfig = new Configuration();
+                       
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
+                       
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
+                       
flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+                       
flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
baseDirForSecureRun.getAbsolutePath());
+                       ctx.setFlinkConfiguration(flinkConfig);
+                       TestingSecurityContext.install(ctx, 
getClientSecurityConfigurationMap());
+
+                       populateSystemEnvVariables();
+
+               } catch(Exception e) {
+                       LOG.error("Exception occured while preparing secure 
environment. Reason: {}", e);
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+       public static void cleanup() {
+
+               LOG.info("Cleaning up Secure Environment");
+
+               if( kdc != null) {
+                       kdc.stop();
+                       LOG.info("Stopped KDC server");
+               }
+
+               resetSystemEnvVariables();
+
+               testKeytab = null;
+               testPrincipal = null;
+               testZkServerPrincipal = null;
+               hadoopServicePrincipal = null;
+               baseDirForSecureRun = null;
+
+       }
+
+       private static void populateSystemEnvVariables() {
+
+               if(LOG.isDebugEnabled()) {
+                       System.setProperty("FLINK_JAAS_DEBUG", "true");
+                       System.setProperty("sun.security.krb5.debug", "true");
+               }
+
+               System.setProperty("java.security.krb5.conf", 
kdc.getKrb5conf().getAbsolutePath());
+
+               System.setProperty("zookeeper.authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+               
System.setProperty("zookeeper.kerberos.removeHostFromPrincipal", "true");
+               
System.setProperty("zookeeper.kerberos.removeRealmFromPrincipal", "true");
+       }
+
+       private static void resetSystemEnvVariables() {
+               System.clearProperty("java.security.krb5.conf");
+               System.clearProperty("FLINK_JAAS_DEBUG");
+               System.clearProperty("sun.security.krb5.debug");
+
+               System.clearProperty("zookeeper.authProvider.1");
+               
System.clearProperty("zookeeper.kerberos.removeHostFromPrincipal");
+               
System.clearProperty("zookeeper.kerberos.removeRealmFromPrincipal");
+       }
+
+       public static org.apache.flink.configuration.Configuration 
populateFlinkSecureConfigurations(
+                       @Nullable org.apache.flink.configuration.Configuration 
flinkConf) {
+
+               org.apache.flink.configuration.Configuration conf;
+
+               if(flinkConf== null) {
+                       conf = new 
org.apache.flink.configuration.Configuration();
+               } else {
+                       conf = flinkConf;
+               }
+
+               conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , 
testKeytab);
+               conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , 
testPrincipal);
+
+               return conf;
+       }
+
+       public static Map<String, 
TestingSecurityContext.ClientSecurityConfiguration> 
getClientSecurityConfigurationMap() {
+
+               Map<String, TestingSecurityContext.ClientSecurityConfiguration> 
clientSecurityConfigurationMap = new HashMap<>();
+
+               if(testZkServerPrincipal != null ) {
+                       TestingSecurityContext.ClientSecurityConfiguration 
zkServer =
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, 
testKeytab,
+                                                       "Server", "zk-server");
+                       clientSecurityConfigurationMap.put("Server",zkServer);
+               }
+
+               if(testZkClientPrincipal != null ) {
+                       TestingSecurityContext.ClientSecurityConfiguration 
zkClient =
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, 
testKeytab,
+                                                       "Client", "zk-client");
+                       clientSecurityConfigurationMap.put("Client",zkClient);
+               }
+
+               if(testKafkaServerPrincipal != null ) {
+                       TestingSecurityContext.ClientSecurityConfiguration 
kafkaServer =
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, 
testKeytab,
+                                                       "KafkaServer", 
"kafka-server");
+                       
clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
+               }
+
+               return clientSecurityConfigurationMap;
+       }
+
+       public static String getTestKeytab() {
+               return testKeytab;
+       }
+
+       public static String getHadoopServicePrincipal() {
+               return hadoopServicePrincipal;
+       }
+
+       /*
+        * Helper method to create a temporary JAAS configuration file to ger 
around the Kafka and ZK SASL
+        * implementation lookup java.security.auth.login.config
+        */
+       private static void  createJaasConfig(File baseDirForSecureRun) {
+
+               try(FileWriter fw = new FileWriter(new 
File(baseDirForSecureRun,SecurityContext.JAAS_CONF_FILENAME), true);
+                       BufferedWriter bw = new BufferedWriter(fw);
+                       PrintWriter out = new PrintWriter(bw))
+               {
+                       out.println("sample {");
+                       out.println("useKeyTab=false");
+                       out.println("useTicketCache=true;");
+                       out.println("};");
+               } catch (IOException e) {
+                       LOG.error("Exception occured while trying to create 
JAAS config. Reason: {}", e.getMessage());
+                       throw new RuntimeException(e);
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
new file mode 100644
index 0000000..25b2362
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.JaasConfiguration;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link TestingJaasConfiguration} for handling the integration test case 
since it requires to manage
+ * client principal as well as server principals of Hadoop/ZK which expects 
the host name to be populated
+ * in specific way (localhost vs 127.0.0.1). This provides an abstraction to 
handle more than one Login Module
+ * since the default {@link JaasConfiguration} behavior only supports 
global/unique principal identifier
+ */
+
+@Internal
+public class TestingJaasConfiguration extends JaasConfiguration {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestingJaasConfiguration.class);
+
+       public Map<String, TestingSecurityContext.ClientSecurityConfiguration> 
clientSecurityConfigurationMap;
+
+       TestingJaasConfiguration(String keytab, String principal, Map<String,
+                       TestingSecurityContext.ClientSecurityConfiguration> 
clientSecurityConfigurationMap) {
+               super(keytab, principal);
+               this.clientSecurityConfigurationMap = 
clientSecurityConfigurationMap;
+       }
+
+       @Override
+       public AppConfigurationEntry[] getAppConfigurationEntry(String 
applicationName) {
+
+               LOG.debug("In TestingJaasConfiguration - Application Requested: 
{}", applicationName);
+
+               AppConfigurationEntry[] appConfigurationEntry = 
super.getAppConfigurationEntry(applicationName);
+
+               if(clientSecurityConfigurationMap != null && 
clientSecurityConfigurationMap.size() > 0) {
+
+                       
if(clientSecurityConfigurationMap.containsKey(applicationName)) {
+
+                               LOG.debug("In TestingJaasConfiguration - 
Application: {} found in the supplied context", applicationName);
+
+                               
TestingSecurityContext.ClientSecurityConfiguration conf = 
clientSecurityConfigurationMap.get(applicationName);
+
+                               if(appConfigurationEntry != null && 
appConfigurationEntry.length > 0) {
+
+                                       for(int count=0; count < 
appConfigurationEntry.length; count++) {
+
+                                               AppConfigurationEntry ace = 
appConfigurationEntry[count];
+
+                                               if 
(ace.getOptions().containsKey("keyTab")) {
+
+                                                       String keyTab = 
conf.getKeytab();
+                                                       String principal = 
conf.getPrincipal();
+
+                                                       LOG.debug("In 
TestingJaasConfiguration - Application: {} from the supplied context will " +
+                                                                       "use 
Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, 
principal);
+
+                                                       Map<String, String> 
newKeytabKerberosOptions = new HashMap<>();
+                                                       
newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
+
+                                                       
newKeytabKerberosOptions.put("keyTab", keyTab);
+                                                       
newKeytabKerberosOptions.put("principal", principal);
+
+                                                       AppConfigurationEntry 
keytabKerberosAce = new AppConfigurationEntry(
+                                                                       
KerberosUtil.getKrb5LoginModuleName(),
+                                                                       
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                                                       
newKeytabKerberosOptions);
+                                                       appConfigurationEntry = 
new AppConfigurationEntry[] {keytabKerberosAce};
+
+                                                       LOG.debug("---->Login 
Module is using Keytab based configuration<------");
+                                                       LOG.debug("Login Module 
Name: " + keytabKerberosAce.getLoginModuleName());
+                                                       LOG.debug("Control 
Flag: " + keytabKerberosAce.getControlFlag());
+                                                       LOG.debug("Options: " + 
keytabKerberosAce.getOptions());
+                                               }
+                                       }
+                               }
+                       }
+
+               }
+
+               return appConfigurationEntry;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
new file mode 100644
index 0000000..5e84c7e
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.SecurityContext;
+
+import java.util.Map;
+
+/*
+ * Test security context to support handling both client and server principals 
in MiniKDC
+ * This class is used only in integration test code for connectors like Kafka, 
HDFS etc.,
+ */
+@Internal
+public class TestingSecurityContext {
+
+       public static void install(SecurityContext.SecurityConfiguration config,
+                                               Map<String, 
ClientSecurityConfiguration> clientSecurityConfigurationMap)
+                       throws Exception {
+
+               SecurityContext.install(config);
+
+               // establish the JAAS config for Test environment
+               TestingJaasConfiguration jaasConfig = new 
TestingJaasConfiguration(config.getKeytab(),
+                               config.getPrincipal(), 
clientSecurityConfigurationMap);
+               
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+       }
+
+       public static class ClientSecurityConfiguration {
+
+               private String principal;
+
+               private String keytab;
+
+               private String moduleName;
+
+               private String jaasServiceName;
+
+               public String getPrincipal() {
+                       return principal;
+               }
+
+               public String getKeytab() {
+                       return keytab;
+               }
+
+               public String getModuleName() {
+                       return moduleName;
+               }
+
+               public String getJaasServiceName() {
+                       return jaasServiceName;
+               }
+
+               public ClientSecurityConfiguration(String principal, String 
keytab, String moduleName, String jaasServiceName) {
+                       this.principal = principal;
+                       this.keytab = keytab;
+                       this.moduleName = moduleName;
+                       this.jaasServiceName = jaasServiceName;
+               }
+
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index ffdca36..68e4752 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -103,6 +103,13 @@ under the License.
                        
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+               </dependency>
+
        </dependencies>
 
        <build>
@@ -298,6 +305,19 @@ under the License.
                                        <skip>true</skip>
                                </configuration>
                        </plugin>
+
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
+                       
                </plugins>
        </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index d03d9eb..a503115 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -180,7 +180,7 @@ public class FlinkYarnSessionCliTest {
                                Mockito.mock(YarnClient.class),
                                Mockito.mock(ApplicationReport.class),
                                config,
-                               new Path("/tmp"), false);
+                               new Path("/temp"), false);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a293348..9d6ff85 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -50,14 +50,14 @@ import java.util.concurrent.TimeUnit;
 
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-       private static TestingServer zkServer;
+       protected static TestingServer zkServer;
 
-       private static ActorSystem actorSystem;
+       protected static ActorSystem actorSystem;
 
-       private static final int numberApplicationAttempts = 10;
+       protected static final int numberApplicationAttempts = 10;
 
        @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
+       public TemporaryFolder temp = new TemporaryFolder();
 
        @BeforeClass
        public static void setup() {
@@ -108,7 +108,11 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                flinkYarnClient.setConfigurationDirectory(confDirPath);
 
-               String fsStateHandlePath = tmp.getRoot().getPath();
+               String fsStateHandlePath = temp.getRoot().getPath();
+
+               // load the configuration
+               File configDirectory = new File(confDirPath);
+               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
                
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
                
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ddea4dd..650397d 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -516,6 +518,27 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                } catch(Throwable t) {
                        LOG.warn("Error while detached yarn session was 
running", t);
                        Assert.fail(t.getMessage());
+               } finally {
+
+                       //cleanup the yarn-properties file
+                       String confDirPath = System.getenv("FLINK_CONF_DIR");
+                       File configDirectory = new File(confDirPath);
+                       LOG.info("testDetachedPerJobYarnClusterInternal: Using 
configuration directory " + configDirectory.getAbsolutePath());
+
+                       // load the configuration
+                       LOG.info("testDetachedPerJobYarnClusterInternal: Trying 
to load configuration file");
+                       
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+                       try {
+                               File yarnPropertiesFile = 
FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+                               if(yarnPropertiesFile.exists()) {
+                                       
LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn 
address reference: {}", yarnPropertiesFile.getAbsolutePath());
+                                       yarnPropertiesFile.delete();
+                               }
+                       } catch (Exception e) {
+                               
LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the 
JobManager address file", e);
+                       }
+
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 3caa0ee..ca696f9 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -100,6 +100,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                while(getRunningContainers() < 2) {
                        sleep(500);
                }
+
+               //additional sleep for the JM/TM to start and establish 
connection
+               sleep(2000);
                LOG.info("Two containers are running. Killing the application");
 
                // kill application "externally".
@@ -121,6 +124,27 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                } catch(Throwable t) {
                        LOG.warn("Killing failed", t);
                        Assert.fail();
+               } finally {
+
+                       //cleanup the yarn-properties file
+                       String confDirPath = System.getenv("FLINK_CONF_DIR");
+                       File configDirectory = new File(confDirPath);
+                       LOG.info("testDetachedPerJobYarnClusterInternal: Using 
configuration directory " + configDirectory.getAbsolutePath());
+
+                       // load the configuration
+                       LOG.info("testDetachedPerJobYarnClusterInternal: Trying 
to load configuration file");
+                       
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
+                       try {
+                               File yarnPropertiesFile = 
FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
+                               if(yarnPropertiesFile.exists()) {
+                                       
LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn 
address reference: {}", yarnPropertiesFile.getAbsolutePath());
+                                       yarnPropertiesFile.delete();
+                               }
+                       } catch (Exception e) {
+                               
LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the 
JobManager address file", e);
+                       }
+
                }
 
                LOG.info("Finished testDetachedMode()");

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
new file mode 100644
index 0000000..0b7c230
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
+
+       @BeforeClass
+       public static void setup() {
+
+               LOG.info("starting secure cluster environment for testing");
+
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
+               yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-fifo-secured");
+
+               SecureTestEnvironment.prepare(tmp);
+
+               
populateYarnSecureConfigurations(yarnConfiguration,SecureTestEnvironment.getHadoopServicePrincipal(),
+                               SecureTestEnvironment.getTestKeytab());
+
+               Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+                               SecureTestEnvironment.getTestKeytab());
+               flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+                               
SecureTestEnvironment.getHadoopServicePrincipal());
+
+               SecurityContext.SecurityConfiguration ctx = new 
SecurityContext.SecurityConfiguration();
+               ctx.setFlinkConfiguration(flinkConfig);
+               ctx.setHadoopConfiguration(yarnConfiguration);
+               try {
+                       TestingSecurityContext.install(ctx, 
SecureTestEnvironment.getClientSecurityConfigurationMap());
+
+                       SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                               @Override
+                               public Integer run() {
+                                       startYARNSecureMode(yarnConfiguration, 
SecureTestEnvironment.getHadoopServicePrincipal(),
+                                                       
SecureTestEnvironment.getTestKeytab());
+                                       return null;
+                               }
+                       });
+
+               } catch(Exception e) {
+                       throw new RuntimeException("Exception occurred while 
setting up secure test context. Reason: {}", e);
+               }
+
+       }
+
+       @AfterClass
+       public static void teardownSecureCluster() throws Exception {
+               LOG.info("tearing down secure cluster environment");
+               SecureTestEnvironment.cleanup();
+       }
+
+       /* For secure cluster testing, it is enough to run only one test and 
override below test methods
+        * to keep the overall build time minimal
+        */
+       @Override
+       public void testQueryCluster() {}
+
+       @Override
+       public void testNonexistingQueue() {}
+
+       @Override
+       public void testResourceComputation() {}
+
+       @Override
+       public void testfullAlloc() {}
+
+       @Override
+       public void testJavaAPI() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 6270010..605aa44 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.Identify;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
@@ -29,6 +30,8 @@ import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -62,6 +65,8 @@ import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -123,6 +128,11 @@ public abstract class YarnTestBase extends TestLogger {
         */
        protected static File flinkLibFolder;
 
+       /**
+        * Temporary folder where Flink configurations will be kept for secure 
run
+        */
+       protected static File tempConfPathForSecureRun = null;
+
        static {
                yarnConfiguration = new YarnConfiguration();
                
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
@@ -140,6 +150,23 @@ public abstract class YarnTestBase extends TestLogger {
        }
 
 
+       public static void populateYarnSecureConfigurations(Configuration conf, 
String principal, String keytab) {
+
+               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+
+               conf.set(YarnConfiguration.RM_KEYTAB, keytab);
+               conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
+               conf.set(YarnConfiguration.NM_KEYTAB, keytab);
+               conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
+
+               conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, 
principal);
+               
conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+               conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, 
principal);
+               
conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,keytab);
+
+               conf.set("hadoop.security.auth_to_local","RULE:[1:$1] 
RULE:[2:$1]");
+       }
 
        /**
         * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
@@ -336,8 +363,16 @@ public abstract class YarnTestBase extends TestLogger {
                return count;
        }
 
+       public static void startYARNSecureMode(Configuration conf, String 
principal, String keytab) {
+               start(conf, principal, keytab);
+       }
+
        public static void startYARNWithConfig(Configuration conf) {
-               // set the home directory to a tmp directory. Flink on YARN is 
using the home dir to distribute the file
+               start(conf,null,null);
+       }
+
+       private static void start(Configuration conf, String principal, String 
keytab) {
+               // set the home directory to a temp directory. Flink on YARN is 
using the home dir to distribute the file
                File homeDir = null;
                try {
                        homeDir = tmp.newFolder();
@@ -374,7 +409,39 @@ public abstract class YarnTestBase extends TestLogger {
                        File flinkConfDirPath = findFile(flinkDistRootDir, new 
ContainsName(new String[]{"flink-conf.yaml"}));
                        Assert.assertNotNull(flinkConfDirPath);
 
-                       map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
flinkConfDirPath.getParent());
+                       if(!StringUtils.isBlank(principal) && 
!StringUtils.isBlank(keytab)) {
+                               //copy conf dir to test temporary workspace 
location
+                               tempConfPathForSecureRun = 
tmp.newFolder("conf");
+
+                               String confDirPath = 
flinkConfDirPath.getParentFile().getAbsolutePath();
+                               FileUtils.copyDirectory(new File(confDirPath), 
tempConfPathForSecureRun);
+
+                               try(FileWriter fw = new FileWriter(new 
File(tempConfPathForSecureRun,"flink-conf.yaml"), true);
+                                       BufferedWriter bw = new 
BufferedWriter(fw);
+                                       PrintWriter out = new PrintWriter(bw))
+                               {
+                                       LOG.info("writing keytab: " + keytab + 
" and principal: " + principal + " to config file");
+                                       out.println("");
+                                       out.println("#Security Configurations 
Auto Populated ");
+                                       
out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
+                                       
out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+                                       out.println("");
+                               } catch (IOException e) {
+                                       LOG.error("Exception occured while 
trying to append the security configurations. Reason: {}", e.getMessage());
+                                       throw new RuntimeException(e);
+                               }
+
+                               String configDir = 
tempConfPathForSecureRun.getAbsolutePath();
+
+                               LOG.info("Temporary Flink configuration 
directory to be used for secure test: {}", configDir);
+
+                               Assert.assertNotNull(configDir);
+
+                               map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
configDir);
+
+                       } else {
+                               map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
flinkConfDirPath.getParent());
+                       }
 
                        File yarnConfFile = writeYarnSiteConfigXML(conf);
                        map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
@@ -392,6 +459,7 @@ public abstract class YarnTestBase extends TestLogger {
                        LOG.error("setup failure", ex);
                        Assert.fail();
                }
+
        }
 
        /**
@@ -421,7 +489,6 @@ public abstract class YarnTestBase extends TestLogger {
                System.setOut(new PrintStream(outContent));
                System.setErr(new PrintStream(errContent));
 
-
                final int START_TIMEOUT_SECONDS = 60;
 
                Runner runner = new Runner(args, type);
@@ -624,12 +691,23 @@ public abstract class YarnTestBase extends TestLogger {
 
        @AfterClass
        public static void teardown() throws Exception {
+
+               LOG.info("Stopping MiniYarn Cluster");
+               yarnCluster.stop();
+
                // Unset FLINK_CONF_DIR, as it might change the behavior of 
other tests
                Map<String, String> map = new HashMap<>(System.getenv());
                map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+               map.remove("YARN_CONF_DIR");
+               map.remove("IN_TESTS");
                TestBaseUtils.setEnv(map);
 
-               // When we are on travis, we copy the tmp files of JUnit 
(containing the MiniYARNCluster log files)
+               if(tempConfPathForSecureRun != null) {
+                       FileUtil.fullyDelete(tempConfPathForSecureRun);
+                       tempConfPathForSecureRun = null;
+               }
+
+               // When we are on travis, we copy the temp files of JUnit 
(containing the MiniYARNCluster log files)
                // to <flinkRoot>/target/flink-yarn-tests-*.
                // The files from there are picked up by the 
./tools/travis_watchdog.sh script
                // to upload them to Amazon S3.
@@ -646,6 +724,7 @@ public abstract class YarnTestBase extends TestLogger {
                                LOG.warn("Error copying the final files from {} 
to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), 
e.getMessage(), e);
                        }
                }
+
        }
 
        public static boolean isOnTravis() {

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
index e94ca26..8f56c1f 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -34,3 +34,8 @@ log4j.logger.org.apache.hadoop=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
+log4j.logger.org.apache.directory=OFF
+log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.net.sf.ehcache=OFF
+log4j.logger.org.apache.hadoop.metrics2=OFF
+log4j.logger.org.apache.hadoop.conf.Configuration=OFF

Reply via email to