Repository: flink
Updated Branches:
  refs/heads/master dffde7efb -> 069de27df


http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
new file mode 100644
index 0000000..fc38b5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.net;
+
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.security.KeyStore;
+
+/**
+ * Common utilities to manage SSL transport settings
+ */
+public class SSLUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SSLUtils.class);
+
+       /**
+        * Retrieves the global ssl flag from configuration
+        *
+        * @param sslConfig
+        *        The application configuration
+        * @return true if global ssl flag is set
+        */
+       public static boolean getSSLEnabled(Configuration sslConfig) {
+
+               Preconditions.checkNotNull(sslConfig);
+
+               return sslConfig.getBoolean( 
ConfigConstants.SECURITY_SSL_ENABLED,
+                       ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED);
+       }
+
+       /**
+        * Sets SSL options to verify peer's hostname in the certificate
+        *
+        * @param sslConfig
+        *        The application configuration
+        * @param sslParams
+        *        The SSL parameters that need to be updated
+        */
+       public static void setSSLVerifyHostname(Configuration sslConfig, 
SSLParameters sslParams) {
+
+               Preconditions.checkNotNull(sslConfig);
+               Preconditions.checkNotNull(sslParams);
+
+               boolean verifyHostname = 
sslConfig.getBoolean(ConfigConstants.SECURITY_SSL_VERIFY_HOSTNAME,
+                       ConfigConstants.DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME);
+               if (verifyHostname) {
+                       sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+               }
+       }
+
+       /**
+        * Creates the SSL Context for the client if SSL is configured
+        *
+        * @param sslConfig
+        *        The application configuration
+        * @return The SSLContext object which can be used by the ssl transport 
client
+        *             Returns null if SSL is disabled
+        * @throws Exception
+        *         Thrown if there is any misconfiguration
+        */
+       public static SSLContext createSSLClientContext(Configuration 
sslConfig) throws Exception {
+
+               Preconditions.checkNotNull(sslConfig);
+               SSLContext clientSSLContext = null;
+
+               if (getSSLEnabled(sslConfig)) {
+                       LOG.debug("Creating client SSL context from 
configuration");
+
+                       String trustStoreFilePath = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+                               null);
+                       String trustStorePassword = sslConfig.getString(
+                               
ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
+                               null);
+                       String sslProtocolVersion = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_PROTOCOL,
+                               ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL);
+
+                       KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+
+                       FileInputStream trustStoreFile = null;
+                       try {
+                               trustStoreFile = new FileInputStream(new 
File(trustStoreFilePath));
+                               trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
+                       } finally {
+                               if (trustStoreFile != null) {
+                                       trustStoreFile.close();
+                               }
+                       }
+
+                       TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(
+                               TrustManagerFactory.getDefaultAlgorithm());
+                       trustManagerFactory.init(trustStore);
+
+                       clientSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
+                       clientSSLContext.init(null, 
trustManagerFactory.getTrustManagers(), null);
+               }
+
+               return clientSSLContext;
+       }
+
+       /**
+        * Creates the SSL Context for the server if SSL is configured
+        *
+        * @param sslConfig
+        *        The application configuration
+        * @return The SSLContext object which can be used by the ssl transport 
server
+        *             Returns null if SSL is disabled
+        * @throws Exception
+        *         Thrown if there is any misconfiguration
+        */
+       public static SSLContext createSSLServerContext(Configuration 
sslConfig) throws Exception {
+
+               Preconditions.checkNotNull(sslConfig);
+               SSLContext serverSSLContext = null;
+
+               if (getSSLEnabled(sslConfig)) {
+                       LOG.debug("Creating server SSL context from 
configuration");
+
+                       String keystoreFilePath = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_KEYSTORE,
+                               null);
+
+                       String keystorePassword = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD,
+                               null);
+
+                       String certPassword = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_KEY_PASSWORD,
+                               null);
+
+                       String sslProtocolVersion = sslConfig.getString(
+                               ConfigConstants.SECURITY_SSL_PROTOCOL,
+                               ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL);
+
+                       KeyStore ks = 
KeyStore.getInstance(KeyStore.getDefaultType());
+                       FileInputStream keyStoreFile = null;
+                       try {
+                               keyStoreFile = new FileInputStream(new 
File(keystoreFilePath));
+                               ks.load(keyStoreFile, 
keystorePassword.toCharArray());
+                       } finally {
+                               if (keyStoreFile != null) {
+                                       keyStoreFile.close();
+                               }
+                       }
+
+                       // Set up key manager factory to use the server key 
store
+                       KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                                       
KeyManagerFactory.getDefaultAlgorithm());
+                       kmf.init(ks, certPassword.toCharArray());
+
+                       // Initialize the SSLContext
+                       serverSSLContext = 
SSLContext.getInstance(sslProtocolVersion);
+                       serverSSLContext.init(kmf.getKeyManagers(), null, null);
+               }
+
+               return serverSSLContext;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
index 460f10e..8bc1ad1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import scala.Option;
-import scala.Tuple2;
+import scala.Tuple3;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -65,10 +65,11 @@ public final class StandaloneUtils {
                throws UnknownHostException {
 
 
-               Tuple2<String, Object> stringIntPair = 
TaskManager.getAndCheckJobManagerAddress(configuration);
+               Tuple3<String, String, Object> stringIntPair = 
TaskManager.getAndCheckJobManagerAddress(configuration);
 
-               String jobManagerHostname = stringIntPair._1();
-               int jobManagerPort = (Integer) stringIntPair._2();
+               String protocol = stringIntPair._1();
+               String jobManagerHostname = stringIntPair._2();
+               int jobManagerPort = (Integer) stringIntPair._3();
                InetSocketAddress hostPort;
 
                try {
@@ -81,6 +82,7 @@ public final class StandaloneUtils {
                }
 
                String jobManagerAkkaUrl = 
JobManager.getRemoteJobManagerAkkaURL(
+                               protocol,
                                hostPort,
                                Option.apply(jobManagerName));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index bd3af33..80bdb73 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.{TimeUnit, Callable}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ConfigValueFactory, ConfigParseOptions, Config, 
ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
 import org.slf4j.LoggerFactory
@@ -265,6 +266,41 @@ object AkkaUtils {
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
+    val akkaEnableSSLConfig = 
configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
+        ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+          SSLUtils.getSSLEnabled(configuration)
+
+    val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
+
+    val akkaSSLKeyStore = configuration.getString(
+      ConfigConstants.SECURITY_SSL_KEYSTORE,
+      null)
+
+    val akkaSSLKeyStorePassword = configuration.getString(
+      ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD,
+      null)
+
+    val akkaSSLKeyPassword = configuration.getString(
+      ConfigConstants.SECURITY_SSL_KEY_PASSWORD,
+      null)
+
+    val akkaSSLTrustStore = configuration.getString(
+      ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+      null)
+
+    val akkaSSLTrustStorePassword = configuration.getString(
+      ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
+      null)
+
+    val akkaSSLProtocol = configuration.getString(
+      ConfigConstants.SECURITY_SSL_PROTOCOL,
+      ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL)
+
+    val akkaSSLAlgorithmsString = configuration.getString(
+      ConfigConstants.SECURITY_SSL_ALGORITHMS,
+      ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS)
+    val akkaSSLAlgorithms = 
akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
+
     val configString =
       s"""
          |akka {
@@ -320,7 +356,40 @@ object AkkaUtils {
         ""
       }
 
-    ConfigFactory.parseString(configString + hostnameConfigString)
+    val sslConfigString = if (akkaEnableSSLConfig) {
+      s"""
+         |akka {
+         |  remote {
+         |
+         |    enabled-transports = ["akka.remote.netty.ssl"]
+         |
+         |    netty {
+         |
+         |      ssl = $${akka.remote.netty.tcp}
+         |
+         |      ssl {
+         |
+         |        enable-ssl = $akkaEnableSSL
+         |        security {
+         |          key-store = "$akkaSSLKeyStore"
+         |          key-store-password = "$akkaSSLKeyStorePassword"
+         |          key-password = "$akkaSSLKeyPassword"
+         |          trust-store = "$akkaSSLTrustStore"
+         |          trust-store-password = "$akkaSSLTrustStorePassword"
+         |          protocol = $akkaSSLProtocol
+         |          enabled-algorithms = $akkaSSLAlgorithms
+         |          random-number-generator = ""
+         |        }
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin
+    }else{
+      ""
+    }
+
+    ConfigFactory.parseString(configString + hostnameConfigString + 
sslConfigString).resolve()
   }
 
   def getLogLevel: String = {
@@ -577,4 +646,18 @@ object AkkaUtils {
         throw new Exception(s"Could not retrieve InetSocketAddress from Akka 
URL $akkaURL")
     }
   }
+
+  /** Returns the protocol field for the URL of the remote actor system given 
the user configuration
+    *
+    * @param config instance containing the user provided configuration values
+    * @return the remote url's protocol field
+    */
+  def getAkkaProtocol(config: Configuration): String = {
+    val sslEnabled = config.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
+        ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+      SSLUtils.getSSLEnabled(config)
+    if (sslEnabled) "akka.ssl.tcp" else "akka.tcp"
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8f3b82a..5dc9e24 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2746,16 +2746,22 @@ object JobManager {
    * Builds the akka actor path for the JobManager actor, given the socket 
address
    * where the JobManager's actor system runs.
    *
+   * @param protocol The protocol to be used to connect to the remote 
JobManager's actor system.
    * @param address The address of the JobManager's actor system.
    * @return The akka URL of the JobManager actor.
    */
   def getRemoteJobManagerAkkaURL(
+      protocol: String,
       address: InetSocketAddress,
       name: Option[String] = None)
     : String = {
+
+    require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp",
+        "protocol field should be either akka.tcp or akka.ssl.tcp")
+
     val hostPort = NetUtils.socketAddressToUrlString(address)
 
-    getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
+    getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name)
   }
 
   /**
@@ -2765,7 +2771,7 @@ object JobManager {
    * @return JobManager actor remote Akka URL
    */
   def getRemoteJobManagerAkkaURL(config: Configuration) : String = {
-    val (hostname, port) = TaskManager.getAndCheckJobManagerAddress(config)
+    val (protocol, hostname, port) = 
TaskManager.getAndCheckJobManagerAddress(config)
 
     var hostPort: InetSocketAddress = null
 
@@ -2779,7 +2785,7 @@ object JobManager {
           s"specified in the configuration")
     }
 
-    JobManager.getRemoteJobManagerAkkaURL(hostPort, Option.empty)
+    JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty)
   }
 
   /**
@@ -2801,11 +2807,12 @@ object JobManager {
   }
 
   def getJobManagerActorRefFuture(
+      protocol: String,
       address: InetSocketAddress,
       system: ActorSystem,
       timeout: FiniteDuration)
     : Future[ActorRef] = {
-    AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(address), system, 
timeout)
+    AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(protocol, address), 
system, timeout)
   }
 
   /**
@@ -2829,6 +2836,7 @@ object JobManager {
   /**
    * Resolves the JobManager actor reference in a blocking fashion.
    *
+   * @param protocol The protocol to be used to connect to the remote 
JobManager's actor system.
    * @param address The socket address of the JobManager's actor system.
    * @param system The local actor system that should perform the lookup.
    * @param timeout The maximum time to wait until the lookup fails.
@@ -2837,12 +2845,13 @@ object JobManager {
    */
   @throws(classOf[IOException])
   def getJobManagerActorRef(
+      protocol: String,
       address: InetSocketAddress,
       system: ActorSystem,
       timeout: FiniteDuration)
     : ActorRef = {
 
-    val jmAddress = getRemoteJobManagerAkkaURL(address)
+    val jmAddress = getRemoteJobManagerAkkaURL(protocol, address)
     getJobManagerActorRef(jmAddress, system, timeout)
   }
 
@@ -2863,6 +2872,7 @@ object JobManager {
     : ActorRef = {
 
     val timeout = AkkaUtils.getLookupTimeout(config)
-    getJobManagerActorRef(address, system, timeout)
+    val protocol = AkkaUtils.getAkkaProtocol(config)
+    getJobManagerActorRef(protocol, address, system, timeout)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index a263f66..048b013 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -513,6 +513,7 @@ abstract class FlinkMiniCluster(
      try {
      JobClient.submitJobAndWait(
        clientActorSystem,
+       configuration,
        leaderRetrievalService,
        jobGraph,
        timeout,
@@ -541,6 +542,7 @@ abstract class FlinkMiniCluster(
     }
 
     JobClient.submitJobDetached(jobManagerGateway,
+      configuration,
       jobGraph,
       timeout,
       this.getClass.getClassLoader())

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index af2b38f..f8f333e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2373,13 +2373,15 @@ object TaskManager {
   }
 
   /**
-   * Gets the hostname and port of the JobManager from the configuration. Also 
checks that
+   * Gets the protocol, hostname and port of the JobManager from the 
configuration. Also checks that
    * the hostname is not null and the port non-negative.
    *
    * @param configuration The configuration to read the config values from.
-   * @return A 2-tuple (hostname, port).
+   * @return A 3-tuple (protocol, hostname, port).
    */
-  def getAndCheckJobManagerAddress(configuration: Configuration) : (String, 
Int) = {
+  def getAndCheckJobManagerAddress(configuration: Configuration) : (String, 
String, Int) = {
+
+    val protocol = AkkaUtils.getAkkaProtocol(configuration)
 
     val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
 
@@ -2397,7 +2399,7 @@ object TaskManager {
         ".  it must be great than 0 and less than 65536.")
     }
 
-    (hostname, port)
+    (protocol, hostname, port)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index aba0aff..4aa9a21 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -54,7 +54,7 @@ public class BlobCacheRetriesTest {
                        BlobClient blobClient = null;
                        BlobKey key;
                        try {
-                               blobClient = new BlobClient(serverAddress);
+                               blobClient = new BlobClient(serverAddress, 
config);
 
                                key = blobClient.put(data);
                        }
@@ -113,7 +113,7 @@ public class BlobCacheRetriesTest {
                        BlobClient blobClient = null;
                        BlobKey key;
                        try {
-                               blobClient = new BlobClient(serverAddress);
+                               blobClient = new BlobClient(serverAddress, 
config);
 
                                key = blobClient.put(data);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 5c3ecf3..7ba5a8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -50,14 +50,15 @@ public class BlobCacheSuccessTest {
                try {
 
                        // Start the BLOB server
-                       blobServer = new BlobServer(new Configuration());
+                       Configuration config = new Configuration();
+                       blobServer = new BlobServer(config);
                        final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getPort());
 
                        // Upload BLOBs
                        BlobClient blobClient = null;
                        try {
 
-                               blobClient = new BlobClient(serverAddress);
+                               blobClient = new BlobClient(serverAddress, 
config);
 
                                blobKeys.add(blobClient.put(buf));
                                buf[0] = 1; // Make sure the BLOB key changes

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
new file mode 100644
index 0000000..5054107
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobClient} with ssl enabled.
+ */
+public class BlobClientSslTest {
+
+       /** The buffer size used during the tests in bytes. */
+       private static final int TEST_BUFFER_SIZE = 17 * 1000;
+
+       /** The instance of the SSL BLOB server used during the tests. */
+       private static BlobServer BLOB_SSL_SERVER;
+
+       /** The SSL blob service client configuration */
+       private static Configuration sslClientConfig;
+
+       /** The instance of the non-SSL BLOB server used during the tests. */
+       private static BlobServer BLOB_SERVER;
+
+       /** The non-ssl blob service client configuration */
+       private static Configuration clientConfig;
+
+       /**
+        * Starts the SSL enabled BLOB server.
+        */
+       @BeforeClass
+       public static void startSSLServer() {
+               try {
+                       Configuration config = new Configuration();
+                       config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+                       config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+                       
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+                       
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+                       BLOB_SSL_SERVER = new BlobServer(config);
+               }
+               catch (IOException e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               sslClientConfig = new Configuration();
+               
sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+               
sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
+       }
+
+       /**
+        * Starts the SSL disabled BLOB server.
+        */
+       @BeforeClass
+       public static void startNonSSLServer() {
+               try {
+                       Configuration config = new Configuration();
+                       config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+                       
config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+                       config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+                       
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+                       
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+                       BLOB_SERVER = new BlobServer(config);
+               }
+               catch (IOException e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               clientConfig = new Configuration();
+               clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               
clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+               clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
+       }
+
+       /**
+        * Shuts the BLOB server down.
+        */
+       @AfterClass
+       public static void stopServers() {
+               if (BLOB_SSL_SERVER != null) {
+                       BLOB_SSL_SERVER.shutdown();
+               }
+
+               if (BLOB_SERVER != null) {
+                       BLOB_SERVER.shutdown();
+               }
+       }
+
+       /**
+        * Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
+        * computes the file's BLOB key.
+        *
+        * @param file
+        *        the file to prepare for the unit tests
+        * @return the BLOB key of the prepared file
+        * @throws IOException
+        *         thrown if an I/O error occurs while writing to the test file
+        */
+       private static BlobKey prepareTestFile(File file) throws IOException {
+
+               MessageDigest md = BlobUtils.createMessageDigest();
+
+               final byte[] buf = new byte[TEST_BUFFER_SIZE];
+               for (int i = 0; i < buf.length; ++i) {
+                       buf[i] = (byte) (i % 128);
+               }
+
+               FileOutputStream fos = null;
+               try {
+                       fos = new FileOutputStream(file);
+
+                       for (int i = 0; i < 20; ++i) {
+                               fos.write(buf);
+                               md.update(buf);
+                       }
+
+               } finally {
+                       if (fos != null) {
+                               fos.close();
+                       }
+               }
+
+               return new BlobKey(md.digest());
+       }
+
+       /**
+        * Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
+        * the specified file.
+        *
+        * @param inputStream
+        *        the input stream returned from the GET operation
+        * @param file
+        *        the file to compare the input stream's data to
+        * @throws IOException
+        *         thrown if an I/O error occurs while reading the input stream 
or the file
+        */
+       private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
+
+               InputStream inputStream2 = null;
+               try {
+
+                       inputStream2 = new FileInputStream(file);
+
+                       while (true) {
+
+                               final int r1 = inputStream.read();
+                               final int r2 = inputStream2.read();
+
+                               assertEquals(r2, r1);
+
+                               if (r1 < 0) {
+                                       break;
+                               }
+                       }
+
+               } finally {
+                       if (inputStream2 != null) {
+                               inputStream2.close();
+                       }
+               }
+
+       }
+
+       /**
+        * Tests the PUT/GET operations for content-addressable streams.
+        */
+       @Test
+       public void testContentAddressableStream() {
+
+               BlobClient client = null;
+               InputStream is = null;
+
+               try {
+                       File testFile = File.createTempFile("testfile", ".dat");
+                       testFile.deleteOnExit();
+
+                       BlobKey origKey = prepareTestFile(testFile);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
+                       client = new BlobClient(serverAddress, sslClientConfig);
+
+                       // Store the data
+                       is = new FileInputStream(testFile);
+                       BlobKey receivedKey = client.put(is);
+                       assertEquals(origKey, receivedKey);
+
+                       is.close();
+                       is = null;
+
+                       // Retrieve the data
+                       is = client.get(receivedKey);
+                       validateGet(is, testFile);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (is != null) {
+                               try {
+                                       is.close();
+                               } catch (Throwable t) {}
+                       }
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {}
+                       }
+               }
+       }
+
+       /**
+        * Tests the PUT/GET operations for regular (non-content-addressable) 
streams.
+        */
+       @Test
+       public void testRegularStream() {
+
+               final JobID jobID = JobID.generate();
+               final String key = "testkey3";
+
+               try {
+                       final File testFile = File.createTempFile("testfile", 
".dat");
+                       testFile.deleteOnExit();
+                       prepareTestFile(testFile);
+
+                       BlobClient client = null;
+                       InputStream is = null;
+                       try {
+
+                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
+                               client = new BlobClient(serverAddress, 
sslClientConfig);
+
+                               // Store the data
+                               is = new FileInputStream(testFile);
+                               client.put(jobID, key, is);
+
+                               is.close();
+                               is = null;
+
+                               // Retrieve the data
+                               is = client.get(jobID, key);
+                               validateGet(is, testFile);
+
+                       }
+                       finally {
+                               if (is != null) {
+                                       is.close();
+                               }
+                               if (client != null) {
+                                       client.close();
+                               }
+                       }
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
+        */
+       private void uploadJarFile(BlobServer blobServer, Configuration 
blobClientConfig) throws Exception {
+               final File testFile = File.createTempFile("testfile", ".dat");
+               testFile.deleteOnExit();
+               prepareTestFile(testFile);
+
+               InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", blobServer.getPort());
+
+               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
+                       Collections.singletonList(new Path(testFile.toURI())));
+
+               assertEquals(1, blobKeys.size());
+
+               try (BlobClient blobClient = new BlobClient(serverAddress, 
blobClientConfig)) {
+                       InputStream is = blobClient.get(blobKeys.get(0));
+                       validateGet(is, testFile);
+               }
+       }
+
+       /**
+        * Verify ssl client to ssl server upload
+        */
+       @Test
+       public void testUploadJarFilesHelper() throws Exception {
+               uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
+       }
+
+       /**
+        * Verify ssl client to non-ssl server failure
+        */
+       @Test
+       public void testSSLClientFailure() throws Exception {
+               try {
+                       uploadJarFile(BLOB_SERVER, sslClientConfig);
+                       fail("SSL client connected to non-ssl server");
+               } catch (Exception e) {
+                       // Exception expected
+               }
+       }
+
+       /**
+        * Verify non-ssl client to ssl server failure
+        */
+       @Test
+       public void testSSLServerFailure() throws Exception {
+               try {
+                       uploadJarFile(BLOB_SSL_SERVER, clientConfig);
+                       fail("Non-SSL client connected to ssl server");
+               } catch (Exception e) {
+                       // Exception expected
+               }
+       }
+
+       /**
+        * Verify non-ssl connection sanity
+        */
+       @Test
+       public void testNonSSLConnection() throws Exception {
+               uploadJarFile(BLOB_SERVER, clientConfig);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index ccdd3a1..8f8f8c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -50,13 +50,17 @@ public class BlobClientTest {
        /** The instance of the BLOB server used during the tests. */
        private static BlobServer BLOB_SERVER;
 
+       /** The blob service client and server configuration */
+       private static Configuration blobServiceConfig;
+
        /**
         * Starts the BLOB server.
         */
        @BeforeClass
        public static void startServer() {
                try {
-                       BLOB_SERVER = new BlobServer(new Configuration());
+                       blobServiceConfig = new Configuration();
+                       BLOB_SERVER = new BlobServer(blobServiceConfig);
                }
                catch (IOException e) {
                        e.printStackTrace();
@@ -207,7 +211,7 @@ public class BlobClientTest {
                        BlobKey origKey = new BlobKey(md.digest());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, 
blobServiceConfig);
 
                        // Store the data
                        BlobKey receivedKey = client.put(testBuffer);
@@ -255,7 +259,7 @@ public class BlobClientTest {
                        BlobKey origKey = prepareTestFile(testFile);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, 
blobServiceConfig);
 
                        // Store the data
                        is = new FileInputStream(testFile);
@@ -301,7 +305,7 @@ public class BlobClientTest {
                        BlobClient client = null;
                        try {
                                final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                               client = new BlobClient(serverAddress);
+                               client = new BlobClient(serverAddress, 
blobServiceConfig);
 
                                // Store the data
                                client.put(jobID, key, testBuffer);
@@ -353,7 +357,7 @@ public class BlobClientTest {
                        try {
 
                                final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                               client = new BlobClient(serverAddress);
+                               client = new BlobClient(serverAddress, 
blobServiceConfig);
 
                                // Store the data
                                is = new FileInputStream(testFile);
@@ -384,7 +388,7 @@ public class BlobClientTest {
        }
 
        /**
-        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
List)} helper.
+        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
         */
        @Test
        public void testUploadJarFilesHelper() throws Exception {
@@ -394,11 +398,12 @@ public class BlobClientTest {
 
                InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
 
-               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, Collections.singletonList(new 
Path(testFile.toURI())));
+               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobServiceConfig,
+                       Collections.singletonList(new Path(testFile.toURI())));
 
                assertEquals(1, blobKeys.size());
 
-               try (BlobClient blobClient = new BlobClient(serverAddress)) {
+               try (BlobClient blobClient = new BlobClient(serverAddress, 
blobServiceConfig)) {
                        InputStream is = blobClient.get(blobKeys.get(0));
                        validateGet(is, testFile);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 8ba20c9..3fe207e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -78,7 +78,7 @@ public class BlobRecoveryITCase {
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
                        }
 
-                       client = new BlobClient(serverAddress[0]);
+                       client = new BlobClient(serverAddress[0], config);
 
                        // Random data
                        byte[] expected = new byte[1024];
@@ -98,7 +98,7 @@ public class BlobRecoveryITCase {
 
                        // Close the client and connect to the other server
                        client.close();
-                       client = new BlobClient(serverAddress[1]);
+                       client = new BlobClient(serverAddress[1], config);
 
                        // Verify request 1
                        try (InputStream is = client.get(keys[0])) {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index cef9ad3..53e1d73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -50,7 +50,7 @@ public class BlobServerDeleteTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -63,7 +63,7 @@ public class BlobServerDeleteTest {
                        client.delete(key);
                        client.close();
 
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
                        try {
                                client.get(key);
                                fail("BLOB should have been deleted");
@@ -108,7 +108,7 @@ public class BlobServerDeleteTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -126,7 +126,7 @@ public class BlobServerDeleteTest {
                        client.deleteAll(jobID);
                        client.close();
 
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
                        try {
                                client.get(jobID, name1);
                                fail("BLOB should have been deleted");
@@ -143,7 +143,7 @@ public class BlobServerDeleteTest {
                                // expected
                        }
 
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
                        try {
                                client.get(jobID, name2);
                                fail("BLOB should have been deleted");
@@ -180,7 +180,7 @@ public class BlobServerDeleteTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -228,7 +228,7 @@ public class BlobServerDeleteTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -279,7 +279,7 @@ public class BlobServerDeleteTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 2853e26..59a62e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -49,7 +49,7 @@ public class BlobServerGetTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -99,7 +99,7 @@ public class BlobServerGetTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[5000000];
                        rnd.nextBytes(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index dc18787..c4d6d1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -51,7 +51,7 @@ public class BlobServerPutTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -82,7 +82,7 @@ public class BlobServerPutTest {
 
                        // close the client and create a new one for the 
remaining requests
                        client.close();
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        InputStream is2 = client.get(key1);
                        byte[] result2 = new byte[data.length];
@@ -125,7 +125,7 @@ public class BlobServerPutTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -172,7 +172,7 @@ public class BlobServerPutTest {
                        server = new BlobServer(config);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -228,7 +228,7 @@ public class BlobServerPutTest {
                        assertTrue(tempFileDir.setWritable(false, false));
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);
@@ -292,7 +292,7 @@ public class BlobServerPutTest {
                        assertTrue(tempFileDir.setWritable(false, false));
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       client = new BlobClient(serverAddress);
+                       client = new BlobClient(serverAddress, config);
 
                        byte[] data = new byte[2000000];
                        rnd.nextBytes(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 2adf7eb..5792c9a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -51,10 +51,12 @@ public class JobClientActorTest extends TestLogger {
 
        private static ActorSystem system;
        private static JobGraph testJobGraph = new JobGraph("Test Job");
+       private static Configuration clientConfig;
 
        @BeforeClass
        public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
+               clientConfig = new Configuration();
+               system = AkkaUtils.createLocalActorSystem(clientConfig);
        }
 
        @AfterClass
@@ -89,7 +91,8 @@ public class JobClientActorTest extends TestLogger {
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
-                       false);
+                       false,
+                       clientConfig);
 
                ActorRef jobClientActor = system.actorOf(jobClientActorProps);
 
@@ -154,7 +157,8 @@ public class JobClientActorTest extends TestLogger {
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
-                       false);
+                       false,
+                       clientConfig);
 
                ActorRef jobClientActor = system.actorOf(jobClientActorProps);
 
@@ -217,7 +221,8 @@ public class JobClientActorTest extends TestLogger {
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
-                       false);
+                       false,
+                       clientConfig);
 
                ActorRef jobClientActor = system.actorOf(jobClientActorProps);
 
@@ -299,6 +304,7 @@ public class JobClientActorTest extends TestLogger {
                JobListeningContext jobListeningContext =
                        JobClient.submitJob(
                                system,
+                               clientConfig,
                                testingLeaderRetrievalService,
                                testJobGraph,
                                timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 3e6702a..5d9ade3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -55,9 +55,10 @@ public class BlobLibraryCacheManagerTest {
                final byte[] buf = new byte[128];
 
                try {
-                       server = new BlobServer(new Configuration());
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
                        InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
-                       BlobClient bc = new BlobClient(blobSocketAddress);
+                       BlobClient bc = new BlobClient(blobSocketAddress, 
config);
 
                        keys.add(bc.put(buf));
                        buf[0] += 1;
@@ -143,7 +144,7 @@ public class BlobLibraryCacheManagerTest {
                        cache = new BlobCache(serverAddress, config);
 
                        // upload some meaningless data to the server
-                       BlobClient uploader = new BlobClient(serverAddress);
+                       BlobClient uploader = new BlobClient(serverAddress, 
config);
                        BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 
5, 6, 7, 8});
                        BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 
14, 15, 16, 17, 18});
                        uploader.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f6cdf09..8fabdf6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -81,7 +81,7 @@ public class BlobLibraryCacheRecoveryITCase {
                        List<BlobKey> keys = new ArrayList<>(2);
 
                        // Upload some data (libraries)
-                       try (BlobClient client = new 
BlobClient(serverAddress[0])) {
+                       try (BlobClient client = new 
BlobClient(serverAddress[0], config)) {
                                keys.add(client.put(expected)); // Request 1
                                keys.add(client.put(expected, 32, 256)); // 
Request 2
                        }
@@ -139,7 +139,7 @@ public class BlobLibraryCacheRecoveryITCase {
                        }
 
                        // Remove blobs again
-                       try (BlobClient client = new 
BlobClient(serverAddress[1])) {
+                       try (BlobClient client = new 
BlobClient(serverAddress[1], config)) {
                                client.delete(keys.get(0));
                                client.delete(keys.get(1));
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
new file mode 100644
index 0000000..da678bd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NettyClientServerSslTest {
+
+       /**
+        * Verify valid ssl configuration and connection
+        *
+        */
+       @Test
+       public void testValidSslConnection() throws Exception {
+               NettyProtocol protocol = new NettyProtocol() {
+                       @Override
+                       public ChannelHandler[] getServerChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
+
+                       @Override
+                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+               };
+
+               NettyConfig nettyConfig = new NettyConfig(
+                       InetAddress.getLoopbackAddress(),
+                       NetUtils.getAvailablePort(),
+                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+                       1,
+                       createSslConfig());
+
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+
+               Channel ch = NettyTestUtil.connect(serverAndClient);
+
+               // should be able to send text data
+               ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
+               assertTrue(ch.writeAndFlush("test").await().isSuccess());
+
+               NettyTestUtil.shutdown(serverAndClient);
+       }
+
+       /**
+        * Verify failure on invalid ssl configuration
+        *
+        */
+       @Test
+       public void testInvalidSslConfiguration() throws Exception {
+               NettyProtocol protocol = new NettyProtocol() {
+                       @Override
+                       public ChannelHandler[] getServerChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
+
+                       @Override
+                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+               };
+
+               Configuration config = createSslConfig();
+               // Modify the keystore password to an incorrect one
+               
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"invalidpassword");
+
+               NettyConfig nettyConfig = new NettyConfig(
+                       InetAddress.getLoopbackAddress(),
+                       NetUtils.getAvailablePort(),
+                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+                       1,
+                       config);
+
+               NettyTestUtil.NettyServerAndClient serverAndClient = null;
+               try {
+                       serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+                       Assert.fail("Created server and client from invalid 
configuration");
+               } catch (Exception e) {
+                       // Exception should be thrown as expected
+               }
+
+               NettyTestUtil.shutdown(serverAndClient);
+       }
+
+       /**
+        * Verify SSL handshake error when untrusted server certificate is used
+        *
+        */
+       @Test
+       public void testSslHandshakeError() throws Exception {
+               NettyProtocol protocol = new NettyProtocol() {
+                       @Override
+                       public ChannelHandler[] getServerChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
+
+                       @Override
+                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+               };
+
+               Configuration config = createSslConfig();
+
+               // Use a server certificate which is not present in the 
truststore
+               config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/untrusted.keystore");
+
+               NettyConfig nettyConfig = new NettyConfig(
+                       InetAddress.getLoopbackAddress(),
+                       NetUtils.getAvailablePort(),
+                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+                       1,
+                       config);
+
+               NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+
+               Channel ch = NettyTestUtil.connect(serverAndClient);
+               ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
+
+               // Attempting to write data over ssl should fail
+               assertFalse(ch.writeAndFlush("test").await().isSuccess());
+
+               NettyTestUtil.shutdown(serverAndClient);
+       }
+
+       private Configuration createSslConfig() throws Exception {
+
+               Configuration flinkConfig = new Configuration();
+               flinkConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password");
+               
flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+               flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
+               return flinkConfig;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 063e4c2..fbe6e8f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -118,6 +118,7 @@ public class JobManagerProcessReapingTest {
                        if (jobManagerPort != -1) {
                                try {
                                        jobManagerRef = 
JobManager.getJobManagerActorRef(
+                                               "akka.tcp",
                                                new 
InetSocketAddress("localhost", jobManagerPort),
                                                localSystem, new 
FiniteDuration(25, TimeUnit.SECONDS));
                                } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 959b9a7..b4f1d3d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -63,28 +63,29 @@ public class JobSubmitTest {
 
        private static ActorSystem jobManagerSystem;
        private static ActorGateway jmGateway;
+       private static Configuration jmConfig;
 
        @BeforeClass
        public static void setupJobManager() {
-               Configuration config = new Configuration();
+               jmConfig = new Configuration();
 
                int port = NetUtils.getAvailablePort();
 
-               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"localhost");
-               config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
port);
+               jmConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"localhost");
+               jmConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
port);
 
                scala.Option<Tuple2<String, Object>> listeningAddress = 
scala.Option.apply(new Tuple2<String, Object>("localhost", port));
-               jobManagerSystem = AkkaUtils.createActorSystem(config, 
listeningAddress);
+               jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, 
listeningAddress);
 
                // only start JobManager (no ResourceManager)
                JobManager.startJobManagerActors(
-                               config,
+                               jmConfig,
                                jobManagerSystem,
                                JobManager.class,
                                MemoryArchivist.class)._1();
 
                try {
-                       LeaderRetrievalService lrs = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
+                       LeaderRetrievalService lrs = 
LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig);
 
                        jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
                                        lrs,
@@ -117,7 +118,7 @@ public class JobSubmitTest {
 
                        // upload two dummy bytes and add their keys to the job 
graph as dependencies
                        BlobKey key1, key2;
-                       BlobClient bc = new BlobClient(new 
InetSocketAddress("localhost", blobPort));
+                       BlobClient bc = new BlobClient(new 
InetSocketAddress("localhost", blobPort), jmConfig);
                        try {
                                key1 = bc.put(new byte[10]);
                                key2 = bc.put(new byte[10]);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 70b1da0..8b8987b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -104,7 +105,8 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
 
                        InetSocketAddress wrongInetSocketAddress = new 
InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234);
 
-                       String wrongAddress = 
JobManager.getRemoteJobManagerAkkaURL(wrongInetSocketAddress, 
Option.<String>empty());
+                       String wrongAddress = 
JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
+                                       wrongInetSocketAddress, 
Option.<String>empty());
 
                        try {
                                localHost = InetAddress.getLocalHost();
@@ -122,7 +124,8 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
 
                        InetSocketAddress correctInetSocketAddress = new 
InetSocketAddress(localHost, serverSocket.getLocalPort());
 
-                       String correctAddress = 
JobManager.getRemoteJobManagerAkkaURL(correctInetSocketAddress, 
Option.<String>empty());
+                       String correctAddress = 
JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
+                                       correctInetSocketAddress, 
Option.<String>empty());
 
                        faultyLeaderElectionService = 
ZooKeeperUtils.createLeaderElectionService(client[0], config);
                        TestingContender wrongLeaderAddressContender = new 
TestingContender(wrongAddress, faultyLeaderElectionService);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
new file mode 100644
index 0000000..1137341
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+
+/*
+ * Tests for the SSL utilities
+ */
+public class SSLUtilsTest {
+
+       /**
+        * Tests if SSL Client Context is created given a valid SSL 
configuration
+        */
+       @Test
+       public void testCreateSSLClientContext() throws Exception {
+
+               Configuration clientConfig = new Configuration();
+               clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
+
+               SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+               Assert.assertNotNull(clientContext);
+       }
+
+       /**
+        * Tests if SSL Client Context is not created if SSL is not configured
+        */
+       @Test
+       public void testCreateSSLClientContextWithSSLDisabled() throws 
Exception {
+
+               Configuration clientConfig = new Configuration();
+               clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
false);
+
+               SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+               Assert.assertNull(clientContext);
+       }
+
+       /**
+        * Tests if SSL Client Context creation fails with bad SSL configuration
+        */
+       @Test
+       public void testCreateSSLClientContextMisconfiguration() {
+
+               Configuration clientConfig = new Configuration();
+               clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"badpassword");
+
+               try {
+                       SSLContext clientContext = 
SSLUtils.createSSLClientContext(clientConfig);
+                       Assert.fail("SSL client context created even with bad 
SSL configuration ");
+               } catch (Exception e) {
+                       // Exception here is valid
+               }
+       }
+
+       /**
+        * Tests if SSL Server Context is created given a valid SSL 
configuration
+        */
+       @Test
+       public void testCreateSSLServerContext() throws Exception {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+
+               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               Assert.assertNotNull(serverContext);
+       }
+
+       /**
+        * Tests if SSL Server Context is not created if SSL is disabled
+        */
+       @Test
+       public void testCreateSSLServerContextWithSSLDisabled() throws 
Exception {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
false);
+
+               SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+               Assert.assertNull(serverContext);
+       }
+
+       /**
+        * Tests if SSL Server Context creation fails with bad SSL configuration
+        */
+       @Test
+       public void testCreateSSLServerContextMisconfiguration() {
+
+               Configuration serverConfig = new Configuration();
+               serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
+               serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"badpassword");
+               
serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"badpassword");
+
+               try {
+                       SSLContext serverContext = 
SSLUtils.createSSLServerContext(serverConfig);
+                       Assert.fail("SSL server context created even with bad 
SSL configuration ");
+               } catch (Exception e) {
+                       // Exception here is valid
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index e8981a0..387b0fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -147,6 +147,7 @@ public class JobManagerProcess extends TestJvmProcess {
                int port = getJobManagerPort(timeout);
 
                return JobManager.getRemoteJobManagerAkkaURL(
+                               AkkaUtils.getAkkaProtocol(config),
                                new InetSocketAddress("localhost", port),
                                Option.<String>empty());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/local127.keystore
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/local127.keystore 
b/flink-runtime/src/test/resources/local127.keystore
new file mode 100644
index 0000000..1b3ca36
Binary files /dev/null and b/flink-runtime/src/test/resources/local127.keystore 
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/local127.truststore
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/local127.truststore 
b/flink-runtime/src/test/resources/local127.truststore
new file mode 100644
index 0000000..4a1da38
Binary files /dev/null and 
b/flink-runtime/src/test/resources/local127.truststore differ

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/untrusted.keystore
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/untrusted.keystore 
b/flink-runtime/src/test/resources/untrusted.keystore
new file mode 100644
index 0000000..6610360
Binary files /dev/null and 
b/flink-runtime/src/test/resources/untrusted.keystore differ

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
new file mode 100644
index 0000000..0f6509c
--- /dev/null
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import akka.actor.ActorSystem
+import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, 
ScalaTestingUtils}
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.scalatest.junit.JUnitRunner
+
+/**
+  * Testing the flink cluster using SSL transport for akka remoting
+  */
+@RunWith(classOf[JUnitRunner])
+class AkkaSslITCase(_system: ActorSystem)
+  extends TestKit(_system)
+    with ImplicitSender
+    with WordSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with ScalaTestingUtils {
+
+  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The flink Cluster" must {
+
+    "start with akka ssl enabled" in {
+
+      val config = new Configuration()
+      config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"127.0.0.1")
+      config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
+      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+
+      config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+      config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE,
+        getClass.getResource("/local127.keystore").getPath)
+      config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password")
+      config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
+      config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE,
+        getClass.getResource("/local127.truststore").getPath)
+
+      config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password")
+
+      val cluster = new TestingCluster(config, false)
+
+      cluster.start(true)
+
+      assert(cluster.running)
+    }
+
+    "start with akka ssl disabled" in {
+
+      val config = new Configuration()
+      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+      config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false)
+
+      val cluster = new TestingCluster(config, false)
+
+      cluster.start(true)
+
+      assert(cluster.running)
+    }
+
+    "fail to start with invalid ssl keystore configured" in {
+
+      an[Exception] should be thrownBy {
+
+        val config = new Configuration()
+        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+
+        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"invalid.keystore")
+        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, 
"password")
+        config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password")
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"invalid.keystore")
+        config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password")
+
+        val cluster = new TestingCluster(config, false)
+
+        cluster.start(true)
+      }
+    }
+
+    "fail to start with missing mandatory ssl configuration" in {
+
+      an[Exception] should be thrownBy {
+
+        val config = new Configuration()
+        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
+        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+
+        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
+
+        val cluster = new TestingCluster(config, false)
+
+        cluster.start(true)
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 4e08857..a18024f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -38,6 +38,7 @@ class AkkaUtilsTest
     val address = new InetSocketAddress(host, port)
 
     val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL(
+      "akka.tcp",
       address,
       Some("actor"))
 
@@ -73,6 +74,15 @@ class AkkaUtilsTest
     result should equal(expected)
   }
 
+  test("getHostFromAkkaURL should handle 'akka.ssl.tcp' as protocol") {
+    val url = "akka.ssl.tcp://flink@localhost:1234/user/jobmanager"
+    val expected = new InetSocketAddress("localhost", 1234)
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(expected)
+  }
+
   test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") {
     val IPv4AddressString = "192.168.0.1"
     val port = 1234
@@ -108,4 +118,16 @@ class AkkaUtilsTest
 
     result should equal(address)
   }
+
+  test("getHostFromAkkaURL should properly handle IPv6 addresses in 
'akka.ssl.tcp' URLs") {
+    val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+    val port = 1234
+    val address = new InetSocketAddress(IPv6AddressString, port)
+
+    val url = 
s"akka.ssl.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 0abdd46..8d92b1c 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -471,7 +471,8 @@ object TestingUtils {
   def submitJobAndWait(
       actorSystem: ActorSystem,
       jobManager: ActorGateway,
-      jobGraph: JobGraph)
+      jobGraph: JobGraph,
+      config: Configuration)
     : JobExecutionResult = {
 
     val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor)
@@ -479,6 +480,7 @@ object TestingUtils {
 
     JobClient.submitJobAndWait(
       actorSystem,
+      config,
       leaderRetrievalService,
       jobGraph,
       TESTING_DURATION,

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 9aaf116..c7050e5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -316,7 +316,8 @@ public class ClassLoaderITCase extends TestLogger {
 
                // Upload JAR
                LOG.info("Uploading JAR " + CUSTOM_KV_STATE_JAR_PATH + " for 
savepoint disposal.");
-               List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jm, 
deadline.timeLeft(), Collections.singletonList(new 
Path(CUSTOM_KV_STATE_JAR_PATH)));
+               List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jm, 
deadline.timeLeft(), testCluster.userConfiguration(),
+                               Collections.singletonList(new 
Path(CUSTOM_KV_STATE_JAR_PATH)));
 
                // Dispose savepoint
                LOG.info("Disposing savepoint at " + savepointPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 2738d22..eacdeb4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -277,6 +277,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                                JobExecutionResult result = 
JobClient.submitJobAndWait(
                                                clientActorSystem,
+                                               cluster.configuration(),
                                                lrService,
                                                graph,
                                                timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 10e229e..8e3418c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -331,8 +332,14 @@ public class YarnApplicationMasterRunner {
                        LOG.debug("Starting Web Frontend");
 
                        webMonitor = 
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, 
LOG);
+
+                       String protocol = "http://";;
+                       if 
(config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
+                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
+                               protocol = "https://";;
+                       }
                        final String webMonitorURL = webMonitor == null ? null :
-                               "http://"; + appMasterHostname + ":" + 
webMonitor.getServerPort();
+                               protocol + appMasterHostname + ":" + 
webMonitor.getServerPort();
 
                        // 3: Flink's Yarn ResourceManager
                        LOG.debug("Starting YARN Flink Resource Manager");

Reply via email to