Repository: flink Updated Branches: refs/heads/master dffde7efb -> 069de27df
http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java new file mode 100644 index 0000000..fc38b5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.net; + + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; + +/** + * Common utilities to manage SSL transport settings + */ +public class SSLUtils { + private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class); + + /** + * Retrieves the global ssl flag from configuration + * + * @param sslConfig + * The application configuration + * @return true if global ssl flag is set + */ + public static boolean getSSLEnabled(Configuration sslConfig) { + + Preconditions.checkNotNull(sslConfig); + + return sslConfig.getBoolean( ConfigConstants.SECURITY_SSL_ENABLED, + ConfigConstants.DEFAULT_SECURITY_SSL_ENABLED); + } + + /** + * Sets SSL options to verify peer's hostname in the certificate + * + * @param sslConfig + * The application configuration + * @param sslParams + * The SSL parameters that need to be updated + */ + public static void setSSLVerifyHostname(Configuration sslConfig, SSLParameters sslParams) { + + Preconditions.checkNotNull(sslConfig); + Preconditions.checkNotNull(sslParams); + + boolean verifyHostname = sslConfig.getBoolean(ConfigConstants.SECURITY_SSL_VERIFY_HOSTNAME, + ConfigConstants.DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME); + if (verifyHostname) { + sslParams.setEndpointIdentificationAlgorithm("HTTPS"); + } + } + + /** + * Creates the SSL Context for the client if SSL is configured + * + * @param sslConfig + * The application configuration + * @return The SSLContext object which can be used by the ssl transport client + * Returns null if SSL is disabled + * @throws Exception + * Thrown if there is any misconfiguration + */ + public static SSLContext createSSLClientContext(Configuration sslConfig) throws Exception { + + Preconditions.checkNotNull(sslConfig); + SSLContext clientSSLContext = null; + + if (getSSLEnabled(sslConfig)) { + LOG.debug("Creating client SSL context from configuration"); + + String trustStoreFilePath = sslConfig.getString( + ConfigConstants.SECURITY_SSL_TRUSTSTORE, + null); + String trustStorePassword = sslConfig.getString( + ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, + null); + String sslProtocolVersion = sslConfig.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); + + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + + FileInputStream trustStoreFile = null; + try { + trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); + trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); + } finally { + if (trustStoreFile != null) { + trustStoreFile.close(); + } + } + + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + clientSSLContext = SSLContext.getInstance(sslProtocolVersion); + clientSSLContext.init(null, trustManagerFactory.getTrustManagers(), null); + } + + return clientSSLContext; + } + + /** + * Creates the SSL Context for the server if SSL is configured + * + * @param sslConfig + * The application configuration + * @return The SSLContext object which can be used by the ssl transport server + * Returns null if SSL is disabled + * @throws Exception + * Thrown if there is any misconfiguration + */ + public static SSLContext createSSLServerContext(Configuration sslConfig) throws Exception { + + Preconditions.checkNotNull(sslConfig); + SSLContext serverSSLContext = null; + + if (getSSLEnabled(sslConfig)) { + LOG.debug("Creating server SSL context from configuration"); + + String keystoreFilePath = sslConfig.getString( + ConfigConstants.SECURITY_SSL_KEYSTORE, + null); + + String keystorePassword = sslConfig.getString( + ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, + null); + + String certPassword = sslConfig.getString( + ConfigConstants.SECURITY_SSL_KEY_PASSWORD, + null); + + String sslProtocolVersion = sslConfig.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL); + + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + FileInputStream keyStoreFile = null; + try { + keyStoreFile = new FileInputStream(new File(keystoreFilePath)); + ks.load(keyStoreFile, keystorePassword.toCharArray()); + } finally { + if (keyStoreFile != null) { + keyStoreFile.close(); + } + } + + // Set up key manager factory to use the server key store + KeyManagerFactory kmf = KeyManagerFactory.getInstance( + KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, certPassword.toCharArray()); + + // Initialize the SSLContext + serverSSLContext = SSLContext.getInstance(sslProtocolVersion); + serverSSLContext.init(kmf.getKeyManagers(), null, null); + } + + return serverSSLContext; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java index 460f10e..8bc1ad1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.taskmanager.TaskManager; import scala.Option; -import scala.Tuple2; +import scala.Tuple3; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -65,10 +65,11 @@ public final class StandaloneUtils { throws UnknownHostException { - Tuple2<String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration); + Tuple3<String, String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration); - String jobManagerHostname = stringIntPair._1(); - int jobManagerPort = (Integer) stringIntPair._2(); + String protocol = stringIntPair._1(); + String jobManagerHostname = stringIntPair._2(); + int jobManagerPort = (Integer) stringIntPair._3(); InetSocketAddress hostPort; try { @@ -81,6 +82,7 @@ public final class StandaloneUtils { } String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL( + protocol, hostPort, Option.apply(jobManagerName)); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index bd3af33..80bdb73 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -24,8 +24,9 @@ import java.util.concurrent.{TimeUnit, Callable} import akka.actor._ import akka.pattern.{ask => akkaAsk} -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.{ConfigValueFactory, ConfigParseOptions, Config, ConfigFactory} import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory} import org.slf4j.LoggerFactory @@ -265,6 +266,41 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" + val akkaEnableSSLConfig = configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED, + ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + SSLUtils.getSSLEnabled(configuration) + + val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" + + val akkaSSLKeyStore = configuration.getString( + ConfigConstants.SECURITY_SSL_KEYSTORE, + null) + + val akkaSSLKeyStorePassword = configuration.getString( + ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, + null) + + val akkaSSLKeyPassword = configuration.getString( + ConfigConstants.SECURITY_SSL_KEY_PASSWORD, + null) + + val akkaSSLTrustStore = configuration.getString( + ConfigConstants.SECURITY_SSL_TRUSTSTORE, + null) + + val akkaSSLTrustStorePassword = configuration.getString( + ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, + null) + + val akkaSSLProtocol = configuration.getString( + ConfigConstants.SECURITY_SSL_PROTOCOL, + ConfigConstants.DEFAULT_SECURITY_SSL_PROTOCOL) + + val akkaSSLAlgorithmsString = configuration.getString( + ConfigConstants.SECURITY_SSL_ALGORITHMS, + ConfigConstants.DEFAULT_SECURITY_SSL_ALGORITHMS) + val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]") + val configString = s""" |akka { @@ -320,7 +356,40 @@ object AkkaUtils { "" } - ConfigFactory.parseString(configString + hostnameConfigString) + val sslConfigString = if (akkaEnableSSLConfig) { + s""" + |akka { + | remote { + | + | enabled-transports = ["akka.remote.netty.ssl"] + | + | netty { + | + | ssl = $${akka.remote.netty.tcp} + | + | ssl { + | + | enable-ssl = $akkaEnableSSL + | security { + | key-store = "$akkaSSLKeyStore" + | key-store-password = "$akkaSSLKeyStorePassword" + | key-password = "$akkaSSLKeyPassword" + | trust-store = "$akkaSSLTrustStore" + | trust-store-password = "$akkaSSLTrustStorePassword" + | protocol = $akkaSSLProtocol + | enabled-algorithms = $akkaSSLAlgorithms + | random-number-generator = "" + | } + | } + | } + | } + |} + """.stripMargin + }else{ + "" + } + + ConfigFactory.parseString(configString + hostnameConfigString + sslConfigString).resolve() } def getLogLevel: String = { @@ -577,4 +646,18 @@ object AkkaUtils { throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL") } } + + /** Returns the protocol field for the URL of the remote actor system given the user configuration + * + * @param config instance containing the user provided configuration values + * @return the remote url's protocol field + */ + def getAkkaProtocol(config: Configuration): String = { + val sslEnabled = config.getBoolean(ConfigConstants.AKKA_SSL_ENABLED, + ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + SSLUtils.getSSLEnabled(config) + if (sslEnabled) "akka.ssl.tcp" else "akka.tcp" + } + } + http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8f3b82a..5dc9e24 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2746,16 +2746,22 @@ object JobManager { * Builds the akka actor path for the JobManager actor, given the socket address * where the JobManager's actor system runs. * + * @param protocol The protocol to be used to connect to the remote JobManager's actor system. * @param address The address of the JobManager's actor system. * @return The akka URL of the JobManager actor. */ def getRemoteJobManagerAkkaURL( + protocol: String, address: InetSocketAddress, name: Option[String] = None) : String = { + + require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp", + "protocol field should be either akka.tcp or akka.ssl.tcp") + val hostPort = NetUtils.socketAddressToUrlString(address) - getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name) + getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name) } /** @@ -2765,7 +2771,7 @@ object JobManager { * @return JobManager actor remote Akka URL */ def getRemoteJobManagerAkkaURL(config: Configuration) : String = { - val (hostname, port) = TaskManager.getAndCheckJobManagerAddress(config) + val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config) var hostPort: InetSocketAddress = null @@ -2779,7 +2785,7 @@ object JobManager { s"specified in the configuration") } - JobManager.getRemoteJobManagerAkkaURL(hostPort, Option.empty) + JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty) } /** @@ -2801,11 +2807,12 @@ object JobManager { } def getJobManagerActorRefFuture( + protocol: String, address: InetSocketAddress, system: ActorSystem, timeout: FiniteDuration) : Future[ActorRef] = { - AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(address), system, timeout) + AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(protocol, address), system, timeout) } /** @@ -2829,6 +2836,7 @@ object JobManager { /** * Resolves the JobManager actor reference in a blocking fashion. * + * @param protocol The protocol to be used to connect to the remote JobManager's actor system. * @param address The socket address of the JobManager's actor system. * @param system The local actor system that should perform the lookup. * @param timeout The maximum time to wait until the lookup fails. @@ -2837,12 +2845,13 @@ object JobManager { */ @throws(classOf[IOException]) def getJobManagerActorRef( + protocol: String, address: InetSocketAddress, system: ActorSystem, timeout: FiniteDuration) : ActorRef = { - val jmAddress = getRemoteJobManagerAkkaURL(address) + val jmAddress = getRemoteJobManagerAkkaURL(protocol, address) getJobManagerActorRef(jmAddress, system, timeout) } @@ -2863,6 +2872,7 @@ object JobManager { : ActorRef = { val timeout = AkkaUtils.getLookupTimeout(config) - getJobManagerActorRef(address, system, timeout) + val protocol = AkkaUtils.getAkkaProtocol(config) + getJobManagerActorRef(protocol, address, system, timeout) } } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index a263f66..048b013 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -513,6 +513,7 @@ abstract class FlinkMiniCluster( try { JobClient.submitJobAndWait( clientActorSystem, + configuration, leaderRetrievalService, jobGraph, timeout, @@ -541,6 +542,7 @@ abstract class FlinkMiniCluster( } JobClient.submitJobDetached(jobManagerGateway, + configuration, jobGraph, timeout, this.getClass.getClassLoader()) http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index af2b38f..f8f333e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2373,13 +2373,15 @@ object TaskManager { } /** - * Gets the hostname and port of the JobManager from the configuration. Also checks that + * Gets the protocol, hostname and port of the JobManager from the configuration. Also checks that * the hostname is not null and the port non-negative. * * @param configuration The configuration to read the config values from. - * @return A 2-tuple (hostname, port). + * @return A 3-tuple (protocol, hostname, port). */ - def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = { + def getAndCheckJobManagerAddress(configuration: Configuration) : (String, String, Int) = { + + val protocol = AkkaUtils.getAkkaProtocol(configuration) val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) @@ -2397,7 +2399,7 @@ object TaskManager { ". it must be great than 0 and less than 65536.") } - (hostname, port) + (protocol, hostname, port) } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index aba0aff..4aa9a21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -54,7 +54,7 @@ public class BlobCacheRetriesTest { BlobClient blobClient = null; BlobKey key; try { - blobClient = new BlobClient(serverAddress); + blobClient = new BlobClient(serverAddress, config); key = blobClient.put(data); } @@ -113,7 +113,7 @@ public class BlobCacheRetriesTest { BlobClient blobClient = null; BlobKey key; try { - blobClient = new BlobClient(serverAddress); + blobClient = new BlobClient(serverAddress, config); key = blobClient.put(data); } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 5c3ecf3..7ba5a8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -50,14 +50,15 @@ public class BlobCacheSuccessTest { try { // Start the BLOB server - blobServer = new BlobServer(new Configuration()); + Configuration config = new Configuration(); + blobServer = new BlobServer(config); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort()); // Upload BLOBs BlobClient blobClient = null; try { - blobClient = new BlobClient(serverAddress); + blobClient = new BlobClient(serverAddress, config); blobKeys.add(blobClient.put(buf)); buf[0] = 1; // Make sure the BLOB key changes http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java new file mode 100644 index 0000000..5054107 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.security.MessageDigest; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This class contains unit tests for the {@link BlobClient} with ssl enabled. + */ +public class BlobClientSslTest { + + /** The buffer size used during the tests in bytes. */ + private static final int TEST_BUFFER_SIZE = 17 * 1000; + + /** The instance of the SSL BLOB server used during the tests. */ + private static BlobServer BLOB_SSL_SERVER; + + /** The SSL blob service client configuration */ + private static Configuration sslClientConfig; + + /** The instance of the non-SSL BLOB server used during the tests. */ + private static BlobServer BLOB_SERVER; + + /** The non-ssl blob service client configuration */ + private static Configuration clientConfig; + + /** + * Starts the SSL enabled BLOB server. + */ + @BeforeClass + public static void startSSLServer() { + try { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + sslClientConfig = new Configuration(); + sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + sslClientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + } + + /** + * Starts the SSL disabled BLOB server. + */ + @BeforeClass + public static void startNonSSLServer() { + try { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + clientConfig = new Configuration(); + clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + } + + /** + * Shuts the BLOB server down. + */ + @AfterClass + public static void stopServers() { + if (BLOB_SSL_SERVER != null) { + BLOB_SSL_SERVER.shutdown(); + } + + if (BLOB_SERVER != null) { + BLOB_SERVER.shutdown(); + } + } + + /** + * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and + * computes the file's BLOB key. + * + * @param file + * the file to prepare for the unit tests + * @return the BLOB key of the prepared file + * @throws IOException + * thrown if an I/O error occurs while writing to the test file + */ + private static BlobKey prepareTestFile(File file) throws IOException { + + MessageDigest md = BlobUtils.createMessageDigest(); + + final byte[] buf = new byte[TEST_BUFFER_SIZE]; + for (int i = 0; i < buf.length; ++i) { + buf[i] = (byte) (i % 128); + } + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file); + + for (int i = 0; i < 20; ++i) { + fos.write(buf); + md.update(buf); + } + + } finally { + if (fos != null) { + fos.close(); + } + } + + return new BlobKey(md.digest()); + } + + /** + * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of + * the specified file. + * + * @param inputStream + * the input stream returned from the GET operation + * @param file + * the file to compare the input stream's data to + * @throws IOException + * thrown if an I/O error occurs while reading the input stream or the file + */ + private static void validateGet(final InputStream inputStream, final File file) throws IOException { + + InputStream inputStream2 = null; + try { + + inputStream2 = new FileInputStream(file); + + while (true) { + + final int r1 = inputStream.read(); + final int r2 = inputStream2.read(); + + assertEquals(r2, r1); + + if (r1 < 0) { + break; + } + } + + } finally { + if (inputStream2 != null) { + inputStream2.close(); + } + } + + } + + /** + * Tests the PUT/GET operations for content-addressable streams. + */ + @Test + public void testContentAddressableStream() { + + BlobClient client = null; + InputStream is = null; + + try { + File testFile = File.createTempFile("testfile", ".dat"); + testFile.deleteOnExit(); + + BlobKey origKey = prepareTestFile(testFile); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort()); + client = new BlobClient(serverAddress, sslClientConfig); + + // Store the data + is = new FileInputStream(testFile); + BlobKey receivedKey = client.put(is); + assertEquals(origKey, receivedKey); + + is.close(); + is = null; + + // Retrieve the data + is = client.get(receivedKey); + validateGet(is, testFile); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (is != null) { + try { + is.close(); + } catch (Throwable t) {} + } + if (client != null) { + try { + client.close(); + } catch (Throwable t) {} + } + } + } + + /** + * Tests the PUT/GET operations for regular (non-content-addressable) streams. + */ + @Test + public void testRegularStream() { + + final JobID jobID = JobID.generate(); + final String key = "testkey3"; + + try { + final File testFile = File.createTempFile("testfile", ".dat"); + testFile.deleteOnExit(); + prepareTestFile(testFile); + + BlobClient client = null; + InputStream is = null; + try { + + final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort()); + client = new BlobClient(serverAddress, sslClientConfig); + + // Store the data + is = new FileInputStream(testFile); + client.put(jobID, key, is); + + is.close(); + is = null; + + // Retrieve the data + is = client.get(jobID, key); + validateGet(is, testFile); + + } + finally { + if (is != null) { + is.close(); + } + if (client != null) { + client.close(); + } + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + */ + private void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { + final File testFile = File.createTempFile("testfile", ".dat"); + testFile.deleteOnExit(); + prepareTestFile(testFile); + + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); + + List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, + Collections.singletonList(new Path(testFile.toURI()))); + + assertEquals(1, blobKeys.size()); + + try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { + InputStream is = blobClient.get(blobKeys.get(0)); + validateGet(is, testFile); + } + } + + /** + * Verify ssl client to ssl server upload + */ + @Test + public void testUploadJarFilesHelper() throws Exception { + uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); + } + + /** + * Verify ssl client to non-ssl server failure + */ + @Test + public void testSSLClientFailure() throws Exception { + try { + uploadJarFile(BLOB_SERVER, sslClientConfig); + fail("SSL client connected to non-ssl server"); + } catch (Exception e) { + // Exception expected + } + } + + /** + * Verify non-ssl client to ssl server failure + */ + @Test + public void testSSLServerFailure() throws Exception { + try { + uploadJarFile(BLOB_SSL_SERVER, clientConfig); + fail("Non-SSL client connected to ssl server"); + } catch (Exception e) { + // Exception expected + } + } + + /** + * Verify non-ssl connection sanity + */ + @Test + public void testNonSSLConnection() throws Exception { + uploadJarFile(BLOB_SERVER, clientConfig); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index ccdd3a1..8f8f8c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -50,13 +50,17 @@ public class BlobClientTest { /** The instance of the BLOB server used during the tests. */ private static BlobServer BLOB_SERVER; + /** The blob service client and server configuration */ + private static Configuration blobServiceConfig; + /** * Starts the BLOB server. */ @BeforeClass public static void startServer() { try { - BLOB_SERVER = new BlobServer(new Configuration()); + blobServiceConfig = new Configuration(); + BLOB_SERVER = new BlobServer(blobServiceConfig); } catch (IOException e) { e.printStackTrace(); @@ -207,7 +211,7 @@ public class BlobClientTest { BlobKey origKey = new BlobKey(md.digest()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, blobServiceConfig); // Store the data BlobKey receivedKey = client.put(testBuffer); @@ -255,7 +259,7 @@ public class BlobClientTest { BlobKey origKey = prepareTestFile(testFile); InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, blobServiceConfig); // Store the data is = new FileInputStream(testFile); @@ -301,7 +305,7 @@ public class BlobClientTest { BlobClient client = null; try { final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, blobServiceConfig); // Store the data client.put(jobID, key, testBuffer); @@ -353,7 +357,7 @@ public class BlobClientTest { try { final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, blobServiceConfig); // Store the data is = new FileInputStream(testFile); @@ -384,7 +388,7 @@ public class BlobClientTest { } /** - * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, List)} helper. + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { @@ -394,11 +398,12 @@ public class BlobClientTest { InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, Collections.singletonList(new Path(testFile.toURI()))); + List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobServiceConfig, + Collections.singletonList(new Path(testFile.toURI()))); assertEquals(1, blobKeys.size()); - try (BlobClient blobClient = new BlobClient(serverAddress)) { + try (BlobClient blobClient = new BlobClient(serverAddress, blobServiceConfig)) { InputStream is = blobClient.get(blobKeys.get(0)); validateGet(is, testFile); } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 8ba20c9..3fe207e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -78,7 +78,7 @@ public class BlobRecoveryITCase { serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); } - client = new BlobClient(serverAddress[0]); + client = new BlobClient(serverAddress[0], config); // Random data byte[] expected = new byte[1024]; @@ -98,7 +98,7 @@ public class BlobRecoveryITCase { // Close the client and connect to the other server client.close(); - client = new BlobClient(serverAddress[1]); + client = new BlobClient(serverAddress[1], config); // Verify request 1 try (InputStream is = client.get(keys[0])) { http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index cef9ad3..53e1d73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -50,7 +50,7 @@ public class BlobServerDeleteTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -63,7 +63,7 @@ public class BlobServerDeleteTest { client.delete(key); client.close(); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); try { client.get(key); fail("BLOB should have been deleted"); @@ -108,7 +108,7 @@ public class BlobServerDeleteTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -126,7 +126,7 @@ public class BlobServerDeleteTest { client.deleteAll(jobID); client.close(); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); try { client.get(jobID, name1); fail("BLOB should have been deleted"); @@ -143,7 +143,7 @@ public class BlobServerDeleteTest { // expected } - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); try { client.get(jobID, name2); fail("BLOB should have been deleted"); @@ -180,7 +180,7 @@ public class BlobServerDeleteTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -228,7 +228,7 @@ public class BlobServerDeleteTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -279,7 +279,7 @@ public class BlobServerDeleteTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 2853e26..59a62e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -49,7 +49,7 @@ public class BlobServerGetTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -99,7 +99,7 @@ public class BlobServerGetTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[5000000]; rnd.nextBytes(data); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index dc18787..c4d6d1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -51,7 +51,7 @@ public class BlobServerPutTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -82,7 +82,7 @@ public class BlobServerPutTest { // close the client and create a new one for the remaining requests client.close(); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); InputStream is2 = client.get(key1); byte[] result2 = new byte[data.length]; @@ -125,7 +125,7 @@ public class BlobServerPutTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -172,7 +172,7 @@ public class BlobServerPutTest { server = new BlobServer(config); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -228,7 +228,7 @@ public class BlobServerPutTest { assertTrue(tempFileDir.setWritable(false, false)); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); @@ -292,7 +292,7 @@ public class BlobServerPutTest { assertTrue(tempFileDir.setWritable(false, false)); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - client = new BlobClient(serverAddress); + client = new BlobClient(serverAddress, config); byte[] data = new byte[2000000]; rnd.nextBytes(data); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 2adf7eb..5792c9a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -51,10 +51,12 @@ public class JobClientActorTest extends TestLogger { private static ActorSystem system; private static JobGraph testJobGraph = new JobGraph("Test Job"); + private static Configuration clientConfig; @BeforeClass public static void setup() { - system = AkkaUtils.createLocalActorSystem(new Configuration()); + clientConfig = new Configuration(); + system = AkkaUtils.createLocalActorSystem(clientConfig); } @AfterClass @@ -89,7 +91,8 @@ public class JobClientActorTest extends TestLogger { Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, - false); + false, + clientConfig); ActorRef jobClientActor = system.actorOf(jobClientActorProps); @@ -154,7 +157,8 @@ public class JobClientActorTest extends TestLogger { Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, - false); + false, + clientConfig); ActorRef jobClientActor = system.actorOf(jobClientActorProps); @@ -217,7 +221,8 @@ public class JobClientActorTest extends TestLogger { Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, - false); + false, + clientConfig); ActorRef jobClientActor = system.actorOf(jobClientActorProps); @@ -299,6 +304,7 @@ public class JobClientActorTest extends TestLogger { JobListeningContext jobListeningContext = JobClient.submitJob( system, + clientConfig, testingLeaderRetrievalService, testJobGraph, timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 3e6702a..5d9ade3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -55,9 +55,10 @@ public class BlobLibraryCacheManagerTest { final byte[] buf = new byte[128]; try { - server = new BlobServer(new Configuration()); + Configuration config = new Configuration(); + server = new BlobServer(config); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); - BlobClient bc = new BlobClient(blobSocketAddress); + BlobClient bc = new BlobClient(blobSocketAddress, config); keys.add(bc.put(buf)); buf[0] += 1; @@ -143,7 +144,7 @@ public class BlobLibraryCacheManagerTest { cache = new BlobCache(serverAddress, config); // upload some meaningless data to the server - BlobClient uploader = new BlobClient(serverAddress); + BlobClient uploader = new BlobClient(serverAddress, config); BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8}); BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18}); uploader.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index f6cdf09..8fabdf6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -81,7 +81,7 @@ public class BlobLibraryCacheRecoveryITCase { List<BlobKey> keys = new ArrayList<>(2); // Upload some data (libraries) - try (BlobClient client = new BlobClient(serverAddress[0])) { + try (BlobClient client = new BlobClient(serverAddress[0], config)) { keys.add(client.put(expected)); // Request 1 keys.add(client.put(expected, 32, 256)); // Request 2 } @@ -139,7 +139,7 @@ public class BlobLibraryCacheRecoveryITCase { } // Remove blobs again - try (BlobClient client = new BlobClient(serverAddress[1])) { + try (BlobClient client = new BlobClient(serverAddress[1], config)) { client.delete(keys.get(0)); client.delete(keys.get(1)); } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java new file mode 100644 index 0000000..da678bd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.NetUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetAddress; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NettyClientServerSslTest { + + /** + * Verify valid ssl configuration and connection + * + */ + @Test + public void testValidSslConnection() throws Exception { + NettyProtocol protocol = new NettyProtocol() { + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[0]; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + }; + + NettyConfig nettyConfig = new NettyConfig( + InetAddress.getLoopbackAddress(), + NetUtils.getAvailablePort(), + NettyTestUtil.DEFAULT_SEGMENT_SIZE, + 1, + createSslConfig()); + + NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); + + Channel ch = NettyTestUtil.connect(serverAndClient); + + // should be able to send text data + ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); + assertTrue(ch.writeAndFlush("test").await().isSuccess()); + + NettyTestUtil.shutdown(serverAndClient); + } + + /** + * Verify failure on invalid ssl configuration + * + */ + @Test + public void testInvalidSslConfiguration() throws Exception { + NettyProtocol protocol = new NettyProtocol() { + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[0]; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + }; + + Configuration config = createSslConfig(); + // Modify the keystore password to an incorrect one + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "invalidpassword"); + + NettyConfig nettyConfig = new NettyConfig( + InetAddress.getLoopbackAddress(), + NetUtils.getAvailablePort(), + NettyTestUtil.DEFAULT_SEGMENT_SIZE, + 1, + config); + + NettyTestUtil.NettyServerAndClient serverAndClient = null; + try { + serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); + Assert.fail("Created server and client from invalid configuration"); + } catch (Exception e) { + // Exception should be thrown as expected + } + + NettyTestUtil.shutdown(serverAndClient); + } + + /** + * Verify SSL handshake error when untrusted server certificate is used + * + */ + @Test + public void testSslHandshakeError() throws Exception { + NettyProtocol protocol = new NettyProtocol() { + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[0]; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + }; + + Configuration config = createSslConfig(); + + // Use a server certificate which is not present in the truststore + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/untrusted.keystore"); + + NettyConfig nettyConfig = new NettyConfig( + InetAddress.getLoopbackAddress(), + NetUtils.getAvailablePort(), + NettyTestUtil.DEFAULT_SEGMENT_SIZE, + 1, + config); + + NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); + + Channel ch = NettyTestUtil.connect(serverAndClient); + ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); + + // Attempting to write data over ssl should fail + assertFalse(ch.writeAndFlush("test").await().isSuccess()); + + NettyTestUtil.shutdown(serverAndClient); + } + + private Configuration createSslConfig() throws Exception { + + Configuration flinkConfig = new Configuration(); + flinkConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + flinkConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + flinkConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + return flinkConfig; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java index 063e4c2..fbe6e8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java @@ -118,6 +118,7 @@ public class JobManagerProcessReapingTest { if (jobManagerPort != -1) { try { jobManagerRef = JobManager.getJobManagerActorRef( + "akka.tcp", new InetSocketAddress("localhost", jobManagerPort), localSystem, new FiniteDuration(25, TimeUnit.SECONDS)); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 959b9a7..b4f1d3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -63,28 +63,29 @@ public class JobSubmitTest { private static ActorSystem jobManagerSystem; private static ActorGateway jmGateway; + private static Configuration jmConfig; @BeforeClass public static void setupJobManager() { - Configuration config = new Configuration(); + jmConfig = new Configuration(); int port = NetUtils.getAvailablePort(); - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); + jmConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + jmConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port)); - jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress); + jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress); // only start JobManager (no ResourceManager) JobManager.startJobManagerActors( - config, + jmConfig, jobManagerSystem, JobManager.class, MemoryArchivist.class)._1(); try { - LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config); + LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(jmConfig); jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway( lrs, @@ -117,7 +118,7 @@ public class JobSubmitTest { // upload two dummy bytes and add their keys to the job graph as dependencies BlobKey key1, key2; - BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort)); + BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig); try { key1 = bc.put(new byte[10]); key2 = bc.put(new byte[10]); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 70b1da0..8b8987b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -104,7 +105,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ InetSocketAddress wrongInetSocketAddress = new InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234); - String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(wrongInetSocketAddress, Option.<String>empty()); + String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config), + wrongInetSocketAddress, Option.<String>empty()); try { localHost = InetAddress.getLocalHost(); @@ -122,7 +124,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort()); - String correctAddress = JobManager.getRemoteJobManagerAkkaURL(correctInetSocketAddress, Option.<String>empty()); + String correctAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config), + correctInetSocketAddress, Option.<String>empty()); faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config); TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java new file mode 100644 index 0000000..1137341 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.net; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import javax.net.ssl.SSLContext; + +/* + * Tests for the SSL utilities + */ +public class SSLUtilsTest { + + /** + * Tests if SSL Client Context is created given a valid SSL configuration + */ + @Test + public void testCreateSSLClientContext() throws Exception { + + Configuration clientConfig = new Configuration(); + clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); + + SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); + Assert.assertNotNull(clientContext); + } + + /** + * Tests if SSL Client Context is not created if SSL is not configured + */ + @Test + public void testCreateSSLClientContextWithSSLDisabled() throws Exception { + + Configuration clientConfig = new Configuration(); + clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false); + + SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); + Assert.assertNull(clientContext); + } + + /** + * Tests if SSL Client Context creation fails with bad SSL configuration + */ + @Test + public void testCreateSSLClientContextMisconfiguration() { + + Configuration clientConfig = new Configuration(); + clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "badpassword"); + + try { + SSLContext clientContext = SSLUtils.createSSLClientContext(clientConfig); + Assert.fail("SSL client context created even with bad SSL configuration "); + } catch (Exception e) { + // Exception here is valid + } + } + + /** + * Tests if SSL Server Context is created given a valid SSL configuration + */ + @Test + public void testCreateSSLServerContext() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + Assert.assertNotNull(serverContext); + } + + /** + * Tests if SSL Server Context is not created if SSL is disabled + */ + @Test + public void testCreateSSLServerContextWithSSLDisabled() throws Exception { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false); + + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + Assert.assertNull(serverContext); + } + + /** + * Tests if SSL Server Context creation fails with bad SSL configuration + */ + @Test + public void testCreateSSLServerContextMisconfiguration() { + + Configuration serverConfig = new Configuration(); + serverConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "badpassword"); + serverConfig.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "badpassword"); + + try { + SSLContext serverContext = SSLUtils.createSSLServerContext(serverConfig); + Assert.fail("SSL server context created even with bad SSL configuration "); + } catch (Exception e) { + // Exception here is valid + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java index e8981a0..387b0fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java @@ -147,6 +147,7 @@ public class JobManagerProcess extends TestJvmProcess { int port = getJobManagerPort(timeout); return JobManager.getRemoteJobManagerAkkaURL( + AkkaUtils.getAkkaProtocol(config), new InetSocketAddress("localhost", port), Option.<String>empty()); } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/local127.keystore ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/local127.keystore b/flink-runtime/src/test/resources/local127.keystore new file mode 100644 index 0000000..1b3ca36 Binary files /dev/null and b/flink-runtime/src/test/resources/local127.keystore differ http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/local127.truststore ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/local127.truststore b/flink-runtime/src/test/resources/local127.truststore new file mode 100644 index 0000000..4a1da38 Binary files /dev/null and b/flink-runtime/src/test/resources/local127.truststore differ http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/resources/untrusted.keystore ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/untrusted.keystore b/flink-runtime/src/test/resources/untrusted.keystore new file mode 100644 index 0000000..6610360 Binary files /dev/null and b/flink-runtime/src/test/resources/untrusted.keystore differ http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala new file mode 100644 index 0000000..0f6509c --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka + +import akka.actor.ActorSystem +import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, ScalaTestingUtils} +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.scalatest.junit.JUnitRunner + +/** + * Testing the flink cluster using SSL transport for akka remoting + */ +@RunWith(classOf[JUnitRunner]) +class AkkaSslITCase(_system: ActorSystem) + extends TestKit(_system) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaTestingUtils { + + def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + "The flink Cluster" must { + + "start with akka ssl enabled" in { + + val config = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1") + config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1") + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, + getClass.getResource("/local127.keystore").getPath) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, + getClass.getResource("/local127.truststore").getPath) + + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + + assert(cluster.running) + } + + "start with akka ssl disabled" in { + + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, false) + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + + assert(cluster.running) + } + + "fail to start with invalid ssl keystore configured" in { + + an[Exception] should be thrownBy { + + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "invalid.keystore") + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password") + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "invalid.keystore") + config.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password") + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + } + } + + "fail to start with missing mandatory ssl configuration" in { + + an[Exception] should be thrownBy { + + val config = new Configuration() + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s") + + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true) + + val cluster = new TestingCluster(config, false) + + cluster.start(true) + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index 4e08857..a18024f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -38,6 +38,7 @@ class AkkaUtilsTest val address = new InetSocketAddress(host, port) val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL( + "akka.tcp", address, Some("actor")) @@ -73,6 +74,15 @@ class AkkaUtilsTest result should equal(expected) } + test("getHostFromAkkaURL should handle 'akka.ssl.tcp' as protocol") { + val url = "akka.ssl.tcp://flink@localhost:1234/user/jobmanager" + val expected = new InetSocketAddress("localhost", 1234) + + val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url) + + result should equal(expected) + } + test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") { val IPv4AddressString = "192.168.0.1" val port = 1234 @@ -108,4 +118,16 @@ class AkkaUtilsTest result should equal(address) } + + test("getHostFromAkkaURL should properly handle IPv6 addresses in 'akka.ssl.tcp' URLs") { + val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329" + val port = 1234 + val address = new InetSocketAddress(IPv6AddressString, port) + + val url = s"akka.ssl.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager" + + val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url) + + result should equal(address) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 0abdd46..8d92b1c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -471,7 +471,8 @@ object TestingUtils { def submitJobAndWait( actorSystem: ActorSystem, jobManager: ActorGateway, - jobGraph: JobGraph) + jobGraph: JobGraph, + config: Configuration) : JobExecutionResult = { val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor) @@ -479,6 +480,7 @@ object TestingUtils { JobClient.submitJobAndWait( actorSystem, + config, leaderRetrievalService, jobGraph, TESTING_DURATION, http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 9aaf116..c7050e5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -316,7 +316,8 @@ public class ClassLoaderITCase extends TestLogger { // Upload JAR LOG.info("Uploading JAR " + CUSTOM_KV_STATE_JAR_PATH + " for savepoint disposal."); - List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jm, deadline.timeLeft(), Collections.singletonList(new Path(CUSTOM_KV_STATE_JAR_PATH))); + List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jm, deadline.timeLeft(), testCluster.userConfiguration(), + Collections.singletonList(new Path(CUSTOM_KV_STATE_JAR_PATH))); // Dispose savepoint LOG.info("Disposing savepoint at " + savepointPath); http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 2738d22..eacdeb4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -277,6 +277,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { JobExecutionResult result = JobClient.submitJobAndWait( clientActorSystem, + cluster.configuration(), lrService, graph, timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 10e229e..8e3418c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -331,8 +332,14 @@ public class YarnApplicationMasterRunner { LOG.debug("Starting Web Frontend"); webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); + + String protocol = "http://"; + if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + protocol = "https://"; + } final String webMonitorURL = webMonitor == null ? null : - "http://" + appMasterHostname + ":" + webMonitor.getServerPort(); + protocol + appMasterHostname + ":" + webMonitor.getServerPort(); // 3: Flink's Yarn ResourceManager LOG.debug("Starting YARN Flink Resource Manager");
