This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit b14254be9a40791da160202a5745820943c9ba4a Author: Mridul Muralidharan <[email protected]> AuthorDate: Wed Mar 27 17:54:24 2024 +0800 [CELEBORN-1349] Add SSL related configs and support for ReloadingX509TrustManager Add SSL related configs and support for `ReloadingX509TrustManager`, required for enabling SSL support. Please see #2416 for the consolidated PR with all the changes for reference. Introduces SSL related configs for enabling and configuring use of TLS. Yes, introduces configs to control behavior of SSL The overall PR #2411 (and this PR as well) passes all tests, this is specifically pulling out the `ReloadingX509TrustManager` and config related changes Closes #2419 from mridulm/config-for-ssl. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <[email protected]> --- LICENSE | 3 + .../network/ssl/ReloadingX509TrustManager.java | 218 ++++++++++++++ .../common/network/util/TransportConf.java | 73 +++++ .../org/apache/celeborn/common/CelebornConf.scala | 194 ++++++++++++- .../common/internal/config/ConfigEntry.scala | 2 +- .../apache/celeborn/common/network/TestHelper.java | 33 +++ .../ssl/ReloadingX509TrustManagerSuiteJ.java | 315 +++++++++++++++++++++ .../common/network/ssl/SslSampleConfigs.java | 211 ++++++++++++++ .../common/network/util/TransportConfSuiteJ.java | 109 +++++++ docs/configuration/network.md | 9 + 10 files changed, 1165 insertions(+), 2 deletions(-) diff --git a/LICENSE b/LICENSE index 30b295e64..95c7a9c72 100644 --- a/LICENSE +++ b/LICENSE @@ -212,9 +212,12 @@ Apache License 2.0 Apache Spark ./client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java ./client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +./common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java ./common/src/main/java/org/apache/celeborn/common/unsafe/Platform.java ./common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java ./common/src/main/scala/org/apache/celeborn/common/util/SignalUtils.scala +./common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java +./common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java ./worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java diff --git a/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java new file mode 100644 index 000000000..82f69f4f7 --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.ssl; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link TrustManager} implementation that reloads its configuration when the truststore file on + * disk changes. This implementation is based off of the + * org.apache.hadoop.security.ssl.ReloadingX509TrustManager class in the Apache Hadoop Encrypted + * Shuffle implementation. + * + * @see <a + * href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html">Hadoop + * MapReduce Next Generation - Encrypted Shuffle</a> + */ +public class ReloadingX509TrustManager implements X509TrustManager, Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ReloadingX509TrustManager.class); + + private final String type; + private final File file; + // The file being pointed to by `file` if it's a link + private String canonicalPath; + private final String password; + private long lastLoaded; + private final long reloadInterval; + @VisibleForTesting protected volatile int reloadCount; + @VisibleForTesting protected volatile int needsReloadCheckCounts; + private final AtomicReference<X509TrustManager> trustManagerRef; + + private Thread reloader; + + /** + * Creates a reloadable trustmanager. The trustmanager reloads itself if the underlying trustore + * file has changed. + * + * @param type type of truststore file, typically 'jks'. + * @param trustStore the truststore file. + * @param password password of the truststore file. + * @param reloadInterval interval to check if the truststore file has changed, in milliseconds. + * @throws IOException thrown if the truststore could not be initialized due to an IO error. + * @throws GeneralSecurityException thrown if the truststore could not be initialized due to a + * security error. + */ + public ReloadingX509TrustManager( + String type, File trustStore, String password, long reloadInterval) + throws IOException, GeneralSecurityException { + this.type = type; + this.file = trustStore; + this.canonicalPath = this.file.getCanonicalPath(); + this.password = password; + this.trustManagerRef = new AtomicReference<X509TrustManager>(); + this.trustManagerRef.set(loadTrustManager()); + this.reloadInterval = reloadInterval; + this.reloadCount = 0; + this.needsReloadCheckCounts = 0; + } + + /** Starts the reloader thread. */ + public void init() { + reloader = new Thread(this, "Truststore reloader thread"); + reloader.setDaemon(true); + reloader.start(); + } + + /** Stops the reloader thread. */ + public void destroy() throws InterruptedException { + reloader.interrupt(); + reloader.join(); + } + + /** + * Returns the reload check interval. + * + * @return the reload check interval, in milliseconds. + */ + public long getReloadInterval() { + return reloadInterval; + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + X509TrustManager tm = trustManagerRef.get(); + if (tm != null) { + tm.checkClientTrusted(chain, authType); + } else { + throw new CertificateException( + "Unknown client chain certificate: " + + chain[0].toString() + + ". Please ensure the correct trust store is specified in the config"); + } + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + X509TrustManager tm = trustManagerRef.get(); + if (tm != null) { + tm.checkServerTrusted(chain, authType); + } else { + throw new CertificateException( + "Unknown server chain certificate: " + + chain[0].toString() + + ". Please ensure the correct trust store is specified in the config"); + } + } + + private static final X509Certificate[] EMPTY = new X509Certificate[0]; + + @Override + public X509Certificate[] getAcceptedIssuers() { + X509Certificate[] issuers = EMPTY; + X509TrustManager tm = trustManagerRef.get(); + if (tm != null) { + issuers = tm.getAcceptedIssuers(); + } + return issuers; + } + + boolean needsReload() throws IOException { + boolean reload = true; + File latestCanonicalFile = file.getCanonicalFile(); + if (file.exists() && latestCanonicalFile.exists()) { + // `file` can be a symbolic link. We need to reload if it points to another file, + // or if the file has been modified + if (latestCanonicalFile.getPath().equals(canonicalPath) + && latestCanonicalFile.lastModified() == lastLoaded) { + reload = false; + } + } else { + lastLoaded = 0; + } + return reload; + } + + X509TrustManager loadTrustManager() throws IOException, GeneralSecurityException { + X509TrustManager trustManager = null; + KeyStore ks = KeyStore.getInstance(type); + File latestCanonicalFile = file.getCanonicalFile(); + canonicalPath = latestCanonicalFile.getPath(); + lastLoaded = latestCanonicalFile.lastModified(); + try (FileInputStream in = new FileInputStream(latestCanonicalFile)) { + char[] passwordCharacters = password != null ? password.toCharArray() : null; + ks.load(in, passwordCharacters); + logger.debug("Loaded truststore '" + file + "'"); + } + + TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(ks); + TrustManager[] trustManagers = trustManagerFactory.getTrustManagers(); + for (TrustManager trustManager1 : trustManagers) { + if (trustManager1 instanceof X509TrustManager) { + trustManager = (X509TrustManager) trustManager1; + break; + } + } + return trustManager; + } + + @Override + public void run() { + boolean running = true; + while (running) { + try { + Thread.sleep(reloadInterval); + } catch (InterruptedException e) { + running = false; + } + try { + if (running && needsReload()) { + try { + trustManagerRef.set(loadTrustManager()); + this.reloadCount += 1; + } catch (Exception ex) { + logger.warn( + "Could not load truststore (keep using existing one) : " + ex.toString(), ex); + } + } + } catch (IOException ex) { + logger.warn("Could not check whether truststore needs reloading: " + ex.toString(), ex); + } + needsReloadCheckCounts++; + } + } +} diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java index 417e93319..a1286b285 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java @@ -17,6 +17,8 @@ package org.apache.celeborn.common.network.util; +import java.io.File; + import org.apache.celeborn.common.CelebornConf; /** A central location that tracks all the settings we expose to users. */ @@ -163,4 +165,75 @@ public class TransportConf { public boolean authEnabled() { return celebornConf.authEnabled(); } + + /** Whether Secure (SSL/TLS) wire communication is enabled. */ + public boolean sslEnabled() { + return celebornConf.sslEnabled(module); + } + + /** SSL protocol (remember that SSLv3 was compromised) supported by Java */ + public String sslProtocol() { + return celebornConf.sslProtocol(module); + } + + /** A comma separated list of ciphers */ + public String[] sslRequestedCiphers() { + return celebornConf.sslRequestedCiphers(module); + } + + /** The key-store file; can be relative to the current directory */ + public File sslKeyStore() { + return celebornConf.sslKeyStore(module); + } + + /** The password to the key-store file */ + public String sslKeyStorePassword() { + return celebornConf.sslKeyStorePassword(module); + } + + /** The trust-store file; can be relative to the current directory */ + public File sslTrustStore() { + return celebornConf.sslTrustStore(module); + } + + /** The password to the trust-store file */ + public String sslTrustStorePassword() { + return celebornConf.sslTrustStorePassword(module); + } + + /** + * If using a trust-store that that reloads its configuration is enabled. If true, when the + * trust-store file on disk changes, it will be reloaded + */ + public boolean sslTrustStoreReloadingEnabled() { + return celebornConf.sslTrustStoreReloadingEnabled(module); + } + + /** The interval, in milliseconds, the trust-store will reload its configuration */ + public int sslTrustStoreReloadIntervalMs() { + return celebornConf.sslTrustStoreReloadIntervalMs(module); + } + + /** Internal config: the max size when chunking the stream with SSL */ + public int maxSslEncryptedBlockSize() { + return celebornConf.maxSslEncryptedBlockSize(module); + } + + // suppressing to ensure clarity of code. + @SuppressWarnings("RedundantIfStatement") + public boolean sslEnabledAndKeysAreValid() { + if (!sslEnabled()) { + return false; + } + // It is not required to have a keyStore for client side connections - only server side + // connectivity ... so transport conf's without keystore can be used in + // client mode only. + // In case it is specified, we check for its validity + File keyStore = sslKeyStore(); + if (keyStore != null && !keyStore.exists()) { + return false; + } + // It's fine for the trust store to be missing, we would default to trusting all. + return true; + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 9ccbb207b..905cfad57 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.common -import java.io.IOException +import java.io.{File, IOException} import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap} import java.util.concurrent.TimeUnit @@ -1139,6 +1139,97 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def hdfsStorageKerberosPrincipal = get(HDFS_STORAGE_KERBEROS_PRINCIPAL) def hdfsStorageKerberosKeytab = get(HDFS_STORAGE_KERBEROS_KEYTAB) + // ////////////////////////////////////////////////////// + // TLS // + // ////////////////////////////////////////////////////// + private def getSslConfig[V](config: ConfigEntry[V], module: String): V = { + // For ssl, we look at the module specific value - and then fallback to without the + // module for global defaults, before falling back on what is in code + val moduleKey = config.key.replace("<module>", module) + // replace the module wildcard and check for global value + val globalKey = config.key.replace(".<module>.", ".") + + val defaultValue = if (config.defaultValue.isDefined) config.defaultValueString else null + + config.valueConverter(getOption(moduleKey).getOrElse(get(globalKey, defaultValue))) + } + + private def asFileOrNull(fileName: Option[String]): File = { + fileName.map(new File(_)).orNull + } + + /** + * Whether Secure (SSL/TLS) wire communication is enabled. + */ + def sslEnabled(module: String): Boolean = { + getSslConfig(SSL_ENABLED, module) + } + + /** + * SSL protocol (remember that SSLv3 was compromised) supported by Java + */ + def sslProtocol(module: String): String = { + getSslConfig(SSL_PROTOCOL, module) + } + + /** + * A comma separated list of ciphers + */ + def sslRequestedCiphers(module: String): Array[String] = { + getSslConfig(SSL_ENABLED_CIPHERS, module).map(_.split(",")).orNull + } + + /** + * The key-store file; can be relative to the current directory + */ + def sslKeyStore(module: String): File = { + val keyStore = getSslConfig(SSL_KEY_STORE, module) + asFileOrNull(keyStore) + } + + /** + * The password to the key-store file + */ + def sslKeyStorePassword(module: String): String = { + getSslConfig(SSL_KEY_STORE_PASSWORD, module).orNull + } + + /** + * The trust-store file; can be relative to the current directory + */ + def sslTrustStore(module: String): File = { + asFileOrNull(getSslConfig(SSL_TRUST_STORE, module)) + } + + /** + * The password to the trust-store file + */ + def sslTrustStorePassword(module: String): String = { + getSslConfig(SSL_TRUST_STORE_PASSWORD, module).orNull + } + + /** + * If using a trust-store that that reloads its configuration is enabled. If true, + * when the trust-store file on disk changes, it will be reloaded + */ + def sslTrustStoreReloadingEnabled(module: String): Boolean = { + getSslConfig(SSL_TRUST_STORE_RELOADING_ENABLED, module) + } + + /** + * The interval, in milliseconds, the trust-store will reload its configuration + */ + def sslTrustStoreReloadIntervalMs(module: String): Int = { + getSslConfig(SSL_TRUST_STORE_RELOAD_INTERVAL_MS, module).toInt + } + + /** + * Internal config: the max size when chunking the stream with SSL + */ + def maxSslEncryptedBlockSize(module: String): Int = { + getSslConfig(MAX_SSL_ENCRYPTED_BLOCK_SIZE, module).toInt + } + // ////////////////////////////////////////////////////// // Authentication // // ////////////////////////////////////////////////////// @@ -4712,4 +4803,105 @@ object CelebornConf extends Logging { .version("0.5.0") .intConf .createWithDefault(10000) + + // SSL Configs + + val SSL_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.ssl.<module>.enabled") + .categories("network", "ssl") + .version("0.5.0") + .doc("Enables SSL for securing wire traffic.") + .booleanConf + .createWithDefault(false) + + val SSL_PROTOCOL: ConfigEntry[String] = + buildConf("celeborn.ssl.<module>.protocol") + .categories("network", "ssl") + .version("0.5.0") + .doc("SSL protocol to use") + .stringConf + // TLSv1.3 requires specific java version, defaulting to v1.2 + .createWithDefault("TLSv1.2") + + val SSL_ENABLED_CIPHERS: OptionalConfigEntry[String] = + buildConf("celeborn.ssl.<module>.enabledAlgorithms") + .categories("network", "ssl") + .version("0.5.0") + .doc("A comma-separated list of ciphers. The specified ciphers must be supported by JVM. " + + "The reference list of protocols can be found in the \"JSSE Cipher Suite Names\" section " + + "of the Java security guide. The list for Java 17 can be found at " + + "https://docs.oracle.com/en/java/javase/17/docs/specs/security/standard-names.html#jsse-cipher-suite-names " + + ". Note: If not set, the default cipher suite for the JRE will be used.") + .stringConf + .createOptional + + val SSL_KEY_STORE: OptionalConfigEntry[String] = + buildConf("celeborn.ssl.<module>.keyStore") + .categories("network", "ssl") + .version("0.5.0") + .doc("Path to the key store file. The path can be absolute or relative to the directory in which the " + + "process is started.") + .stringConf + .createOptional + + val SSL_KEY_STORE_PASSWORD: OptionalConfigEntry[String] = + buildConf("celeborn.ssl.<module>.keyStorePassword") + .categories("network", "ssl") + .version("0.5.0") + .doc("Password to the key store.") + .stringConf + .createOptional + + val SSL_TRUST_STORE: OptionalConfigEntry[String] = + buildConf("celeborn.ssl.<module>.trustStore") + .categories("network", "ssl") + .version("0.5.0") + .doc("Path to the trust store file. The path can be absolute or relative to the directory " + + "in which the process is started.") + .stringConf + .createOptional + + val SSL_TRUST_STORE_PASSWORD: OptionalConfigEntry[String] = + buildConf("celeborn.ssl.<module>.trustStorePassword") + .categories("network", "ssl") + .version("0.5.0") + .doc("Password for the trust store.") + .stringConf + .createOptional + + val SSL_TRUST_STORE_RELOADING_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.ssl.<module>.trustStoreReloadingEnabled") + .categories("network", "ssl") + .version("0.5.0") + .doc("Whether the trust store should be reloaded periodically. This setting is mostly only " + + "useful for server components, not applications.") + .booleanConf + .createWithDefault(false) + + val SSL_TRUST_STORE_RELOAD_INTERVAL_MS: ConfigEntry[Long] = + buildConf("celeborn.ssl.<module>.trustStoreReloadIntervalMs") + .categories("network", "ssl") + .version("0.5.0") + .doc("The interval at which the trust store should be reloaded (in milliseconds). This " + + "setting is mostly only useful for server components, not applications.") + .timeConf(TimeUnit.MILLISECONDS) + // We treat this as an int, so validate + .checkValue( + p => p > 0 && p <= Int.MaxValue, + s"Invalid trustStoreReloadIntervalMs, must be a position number upto ${Int.MaxValue}") + .createWithDefaultString("10s") + + val MAX_SSL_ENCRYPTED_BLOCK_SIZE: ConfigEntry[Long] = + buildConf("celeborn.ssl.<module>.maxEncryptedBlockSize") + .categories("network", "ssl") + .version("0.5.0") + .internal + .doc("The max size when chunking the stream with SSL") + .bytesConf(ByteUnit.BYTE) + // We treat this as an int, so validate + .checkValue( + p => p > 0 && p <= Int.MaxValue, + s"Invalid maxEncryptedBlockSize, must be a position number upto ${Int.MaxValue}") + .createWithDefaultString("64k") + } diff --git a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala index 344bd1aff..bfd9cca76 100644 --- a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala +++ b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala @@ -240,7 +240,7 @@ class OptionalConfigEntry[T]( prependedKey, prependSeparator, alternatives, - s => Some(rawValueConverter(s)), + s => Option(rawValueConverter(s)), v => v.map(rawStringConverter).orNull, doc, isPublic, diff --git a/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java b/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java new file mode 100644 index 000000000..1e9c1cb48 --- /dev/null +++ b/common/src/test/java/org/apache/celeborn/common/network/TestHelper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network; + +import java.util.Map; + +import org.apache.celeborn.common.CelebornConf; + +/** A few helper utilities to reduce duplication within test code. */ +public class TestHelper { + + public static CelebornConf updateCelebornConfWithMap(CelebornConf conf, Map<String, String> map) { + for (Map.Entry<String, String> entry : map.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } +} diff --git a/common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java new file mode 100644 index 000000000..9b79662d3 --- /dev/null +++ b/common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.celeborn.common.network.ssl; + +import static org.apache.celeborn.common.network.ssl.SslSampleConfigs.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.cert.X509Certificate; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class ReloadingX509TrustManagerSuiteJ { + + /** + * Waits until reload count hits the requested value, sleeping 100ms at a time. If the maximum + * number of attempts is hit, throws a RuntimeException + * + * @param tm the trust manager to wait for + * @param count The count to wait for + * @param attempts The number of attempts to wait for + */ + private void waitForReloadCount(ReloadingX509TrustManager tm, int count, int attempts) + throws InterruptedException { + if (tm.reloadCount > count) { + throw new IllegalStateException( + "Passed invalid count " + + count + + " to waitForReloadCount, already have " + + tm.reloadCount); + } + for (int i = 0; i < attempts; i++) { + if (tm.reloadCount >= count) { + return; + } + // Adapted from SystemClock.waitTillTime + long startTime = System.currentTimeMillis(); + long targetTime = startTime + 100; + long currentTime = startTime; + while (currentTime < targetTime) { + long sleepTime = Math.min(10, targetTime - currentTime); + Thread.sleep(sleepTime); + currentTime = System.currentTimeMillis(); + } + } + throw new IllegalStateException("Trust store not reloaded after " + attempts + " attempts!"); + } + + /** + * Waits until we make some number of attempts to reload, and verifies that the actual reload + * count did not change + * + * @param tm the trust manager to wait for + * @param attempts The number of attempts to wait for + */ + private void waitForNoReload(ReloadingX509TrustManager tm, int attempts) + throws InterruptedException { + int oldReloadCount = tm.reloadCount; + int checkCount = tm.needsReloadCheckCounts; + int target = checkCount + attempts; + while (checkCount < target) { + Thread.sleep(100); + checkCount = tm.needsReloadCheckCounts; + } + assertEquals(oldReloadCount, tm.reloadCount); + } + + /** + * Tests to ensure that loading a missing trust-store fails + * + * @throws Exception + */ + @Test + public void testLoadMissingTrustStore() throws Exception { + File trustStore = new File("testmissing.jks"); + assertFalse(trustStore.exists()); + + assertThrows( + IOException.class, + () -> { + ReloadingX509TrustManager tm = + new ReloadingX509TrustManager(KeyStore.getDefaultType(), trustStore, "password", 10); + try { + tm.init(); + } finally { + tm.destroy(); + } + }); + } + + /** + * Tests to ensure that loading a corrupt trust-store fails + * + * @throws Exception + */ + @Test + public void testLoadCorruptTrustStore() throws Exception { + File corruptStore = File.createTempFile("truststore-corrupt", "jks"); + corruptStore.deleteOnExit(); + OutputStream os = new FileOutputStream(corruptStore); + os.write(1); + os.close(); + + assertThrows( + IOException.class, + () -> { + ReloadingX509TrustManager tm = + new ReloadingX509TrustManager( + KeyStore.getDefaultType(), corruptStore, "password", 10); + try { + tm.init(); + } finally { + tm.destroy(); + corruptStore.delete(); + } + }); + } + + /** + * Tests that we successfully reload when a file is updated + * + * @throws Exception + */ + @Test + public void testReload() throws Exception { + KeyPair kp = generateKeyPair("RSA"); + X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA"); + X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA"); + File trustStore = File.createTempFile("testreload", "jks"); + trustStore.deleteOnExit(); + createTrustStore(trustStore, "password", "cert1", cert1); + + ReloadingX509TrustManager tm = new ReloadingX509TrustManager("jks", trustStore, "password", 1); + assertEquals(1, tm.getReloadInterval()); + assertEquals(0, tm.reloadCount); + try { + tm.init(); + assertEquals(1, tm.getAcceptedIssuers().length); + // At this point we haven't reloaded, just the initial load + assertEquals(0, tm.reloadCount); + + // Wait so that the file modification time is different + Thread.sleep((tm.getReloadInterval() + 1000)); + + // Add another cert + Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>(); + certs.put("cert1", cert1); + certs.put("cert2", cert2); + createTrustStore(trustStore, "password", certs); + + // Wait up to 10s until we reload + waitForReloadCount(tm, 1, 100); + + assertEquals(2, tm.getAcceptedIssuers().length); + } finally { + tm.destroy(); + trustStore.delete(); + } + } + + /** + * Tests that we keep old certs if the trust store goes missing + * + * @throws Exception + */ + @Test + public void testReloadMissingTrustStore() throws Exception { + KeyPair kp = generateKeyPair("RSA"); + X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA"); + File trustStore = new File("testmissing.jks"); + trustStore.deleteOnExit(); + assertFalse(trustStore.exists()); + createTrustStore(trustStore, "password", "cert1", cert1); + + ReloadingX509TrustManager tm = new ReloadingX509TrustManager("jks", trustStore, "password", 1); + assertEquals(0, tm.reloadCount); + try { + tm.init(); + assertEquals(1, tm.getAcceptedIssuers().length); + X509Certificate cert = tm.getAcceptedIssuers()[0]; + trustStore.delete(); + + // Wait for up to 5s - we should *not* reload + waitForNoReload(tm, 50); + + assertEquals(1, tm.getAcceptedIssuers().length); + assertEquals(cert, tm.getAcceptedIssuers()[0]); + } finally { + tm.destroy(); + } + } + + /** + * Tests that we keep old certs if the new truststore is corrupt + * + * @throws Exception + */ + @Test + public void testReloadCorruptTrustStore() throws Exception { + KeyPair kp = generateKeyPair("RSA"); + X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA"); + File corruptStore = File.createTempFile("truststore-corrupt", "jks"); + corruptStore.deleteOnExit(); + createTrustStore(corruptStore, "password", "cert1", cert1); + + ReloadingX509TrustManager tm = + new ReloadingX509TrustManager("jks", corruptStore, "password", 1); + assertEquals(0, tm.reloadCount); + try { + tm.init(); + assertEquals(1, tm.getAcceptedIssuers().length); + X509Certificate cert = tm.getAcceptedIssuers()[0]; + + OutputStream os = new FileOutputStream(corruptStore); + os.write(1); + os.close(); + corruptStore.setLastModified(System.currentTimeMillis() - 1000); + + // Wait for up to 5s - we should *not* reload + waitForNoReload(tm, 50); + + assertEquals(1, tm.getAcceptedIssuers().length); + assertEquals(cert, tm.getAcceptedIssuers()[0]); + } finally { + tm.destroy(); + corruptStore.delete(); + } + } + + /** + * Tests that we successfully reload when the trust store is a symlink and we update the contents + * of the pointed-to file or we update the file it points to. + * + * @throws Exception + */ + @Test + public void testReloadSymlink() throws Exception { + KeyPair kp = generateKeyPair("RSA"); + X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA"); + X509Certificate cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA"); + X509Certificate cert3 = generateCertificate("CN=Cert3", kp, 30, "SHA1withRSA"); + + File trustStore1 = File.createTempFile("testreload", "jks"); + trustStore1.deleteOnExit(); + createTrustStore(trustStore1, "password", "cert1", cert1); + + File trustStore2 = File.createTempFile("testreload", "jks"); + Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>(); + certs.put("cert1", cert1); + certs.put("cert2", cert2); + createTrustStore(trustStore2, "password", certs); + + File trustStoreSymlink = File.createTempFile("testreloadsymlink", "jks"); + trustStoreSymlink.delete(); + Files.createSymbolicLink(trustStoreSymlink.toPath(), trustStore1.toPath()); + + ReloadingX509TrustManager tm = + new ReloadingX509TrustManager("jks", trustStoreSymlink, "password", 1); + assertEquals(1, tm.getReloadInterval()); + assertEquals(0, tm.reloadCount); + try { + tm.init(); + assertEquals(1, tm.getAcceptedIssuers().length); + // At this point we haven't reloaded, just the initial load + assertEquals(0, tm.reloadCount); + + // Repoint to trustStore2, which has another cert + trustStoreSymlink.delete(); + Files.createSymbolicLink(trustStoreSymlink.toPath(), trustStore2.toPath()); + + // Wait up to 10s until we reload + waitForReloadCount(tm, 1, 100); + + assertEquals(2, tm.getAcceptedIssuers().length); + + // Add another cert + certs.put("cert3", cert3); + createTrustStore(trustStore2, "password", certs); + + // Wait up to 10s until we reload + waitForReloadCount(tm, 2, 100); + + assertEquals(3, tm.getAcceptedIssuers().length); + } finally { + tm.destroy(); + trustStore1.delete(); + trustStore2.delete(); + trustStoreSymlink.delete(); + } + } +} diff --git a/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java b/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java new file mode 100644 index 000000000..653709a31 --- /dev/null +++ b/common/src/test/java/org/apache/celeborn/common/network/ssl/SslSampleConfigs.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.ssl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.security.*; +import java.security.cert.Certificate; +import java.security.cert.CertificateEncodingException; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import javax.security.auth.x500.X500Principal; + +import org.apache.commons.io.FileUtils; +import org.bouncycastle.x509.X509V1CertificateGenerator; + +public class SslSampleConfigs { + + public static final String DEFAULT_KEY_STORE_PATH = getResourceAsAbsolutePath("/ssl/server.jks"); + public static final String SECOND_KEY_STORE_PATH = + getResourceAsAbsolutePath("/ssl/server_another.jks"); + + // trust store has ca's for both keys. + public static final String TRUST_STORE_PATH = getResourceAsAbsolutePath("/ssl/truststore.jks"); + + // this is a trust store which does not have either the primary or second cert's ca + public static final String TRUST_STORE_WITHOUT_CA = + getResourceAsAbsolutePath("/ssl/truststore-without-ca.jks"); + + public static Map<String, String> createDefaultConfigMapForModule(String module) { + return createConfigMapForModule(module, true); + } + + public static Map<String, String> createAnotherConfigMapForModule(String module) { + return createConfigMapForModule(module, false); + } + + private static Map<String, String> createConfigMapForModule(String module, boolean forDefault) { + Map<String, String> confMap = new HashMap<>(); + confMap.put("celeborn.ssl." + module + ".enabled", "true"); + confMap.put("celeborn.ssl." + module + ".trustStoreReloadingEnabled", "false"); + confMap.put("celeborn.ssl." + module + ".openSslEnabled", "false"); + confMap.put("celeborn.ssl." + module + ".trustStoreReloadIntervalMs", "10000"); + if (forDefault) { + confMap.put("celeborn.ssl." + module + ".keyStore", DEFAULT_KEY_STORE_PATH); + } else { + confMap.put("celeborn.ssl." + module + ".keyStore", SECOND_KEY_STORE_PATH); + } + confMap.put("celeborn.ssl." + module + ".keyStorePassword", "password"); + confMap.put("celeborn.ssl." + module + ".keyPassword", "password"); + confMap.put("celeborn.ssl." + module + ".privateKeyPassword", "password"); + confMap.put("celeborn.ssl." + module + ".protocol", "TLSv1.2"); + confMap.put("celeborn.ssl." + module + ".trustStore", TRUST_STORE_PATH); + confMap.put("celeborn.ssl." + module + ".trustStorePassword", "password"); + return confMap; + } + + public static void createTrustStore( + File trustStore, String password, String alias, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setCertificateEntry(alias, cert); + saveKeyStore(ks, trustStore, password); + } + + /** Creates a keystore with multiple keys and saves it to a file. */ + public static <T extends Certificate> void createTrustStore( + File trustStore, String password, Map<String, T> certs) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + for (Map.Entry<String, T> cert : certs.entrySet()) { + ks.setCertificateEntry(cert.getKey(), cert.getValue()); + } + saveKeyStore(ks, trustStore, password); + } + + /** + * Create a self-signed X.509 Certificate. + * + * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB" + * @param pair the KeyPair + * @param days how many days from now the Certificate is valid for + * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @return the self-signed certificate + */ + @SuppressWarnings("deprecation") + public static X509Certificate generateCertificate( + String dn, KeyPair pair, int days, String algorithm) + throws CertificateEncodingException, InvalidKeyException, IllegalStateException, + NoSuchAlgorithmException, SignatureException { + + Date from = new Date(); + Date to = new Date(from.getTime() + days * 86400000L); + BigInteger sn = new BigInteger(64, new SecureRandom()); + KeyPair keyPair = pair; + X509V1CertificateGenerator certGen = new X509V1CertificateGenerator(); + X500Principal dnName = new X500Principal(dn); + + certGen.setSerialNumber(sn); + certGen.setIssuerDN(dnName); + certGen.setNotBefore(from); + certGen.setNotAfter(to); + certGen.setSubjectDN(dnName); + certGen.setPublicKey(keyPair.getPublic()); + certGen.setSignatureAlgorithm(algorithm); + + X509Certificate cert = certGen.generate(pair.getPrivate()); + return cert; + } + + public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); + keyGen.initialize(1024); + return keyGen.genKeyPair(); + } + + /** + * Creates a keystore with a single key and saves it to a file. + * + * @param keyStore File keystore to save + * @param password String store password to set on keystore + * @param keyPassword String key password to set on key + * @param alias String alias to use for the key + * @param privateKey Key to save in keystore + * @param cert Certificate to use as certificate chain associated to key + * @throws GeneralSecurityException for any error with the security APIs + * @throws IOException if there is an I/O error saving the file + */ + public static void createKeyStore( + File keyStore, + String password, + String keyPassword, + String alias, + Key privateKey, + Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(), new Certificate[] {cert}); + saveKeyStore(ks, keyStore, password); + } + + public static void createKeyStore( + File keyStore, String password, String alias, Key privateKey, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, password.toCharArray(), new Certificate[] {cert}); + saveKeyStore(ks, keyStore, password); + } + + private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); // initialize + return ks; + } + + private static void saveKeyStore(KeyStore ks, File keyStore, String password) + throws GeneralSecurityException, IOException { + // Write the file atomically to ensure tests don't read a partial write + File tempFile = File.createTempFile("temp-key-store", "jks"); + FileOutputStream out = new FileOutputStream(tempFile); + try { + ks.store(out, password.toCharArray()); + out.close(); + Files.move( + tempFile.toPath(), + keyStore.toPath(), + StandardCopyOption.REPLACE_EXISTING, + StandardCopyOption.ATOMIC_MOVE); + } finally { + out.close(); + } + } + + public static String getResourceAsAbsolutePath(String path) { + try { + File tempFile = File.createTempFile(new File(path).getName(), null); + tempFile.deleteOnExit(); + URL url = SslSampleConfigs.class.getResource(path); + if (null == url) { + throw new IllegalArgumentException("Unable to find " + path); + } + FileUtils.copyInputStreamToFile(url.openStream(), tempFile); + return tempFile.getCanonicalPath(); + } catch (IOException e) { + throw new RuntimeException("Failed to resolve path " + path, e); + } + } +} diff --git a/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java new file mode 100644 index 000000000..92288b21a --- /dev/null +++ b/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.junit.Test; + +import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.network.TestHelper; +import org.apache.celeborn.common.network.ssl.SslSampleConfigs; + +public class TransportConfSuiteJ { + + private TransportConf transportConf = + new TransportConf( + "rpc", + TestHelper.updateCelebornConfWithMap( + new CelebornConf(), SslSampleConfigs.createDefaultConfigMapForModule("rpc"))); + + @Test + public void testKeyStorePath() { + assertEquals(new File(SslSampleConfigs.DEFAULT_KEY_STORE_PATH), transportConf.sslKeyStore()); + } + + @Test + public void testTrustStorePath() { + assertEquals(new File(SslSampleConfigs.TRUST_STORE_PATH), transportConf.sslTrustStore()); + } + + @Test + public void testTrustStoreReloadingEnabled() { + assertFalse(transportConf.sslTrustStoreReloadingEnabled()); + } + + @Test + public void testSslEnabled() { + assertTrue(transportConf.sslEnabled()); + } + + @Test + public void testSslKeyStorePassword() { + assertEquals("password", transportConf.sslKeyStorePassword()); + } + + @Test + public void testSslTrustStorePassword() { + assertEquals("password", transportConf.sslTrustStorePassword()); + } + + @Test + public void testSsltrustStoreReloadIntervalMs() { + assertEquals(10000, transportConf.sslTrustStoreReloadIntervalMs()); + } + + // If a specific key is not set, it should be inherited from celeborn.ssl namespace + @Test + public void testInheritance() { + + final String module1 = "rpc"; + final String module2 = "fetch"; + + final String module1Protocol = "456"; + final String module2Protocol = "789"; + + final long module1ReloadIntervalMs = 123456; + final long defaultReloadIntervalMs = 83723; + + CelebornConf conf = new CelebornConf(); + + // Both should be independently working + conf.set("celeborn.ssl." + module1 + ".protocol", module1Protocol); + conf.set("celeborn.ssl." + module2 + ".protocol", module2Protocol); + + // setting at celeborn.ssl should inherit for module2 as it is not overriden + conf.set( + "celeborn.ssl." + module1 + ".trustStoreReloadIntervalMs", + Long.toString(module1ReloadIntervalMs)); + conf.set("celeborn.ssl.trustStoreReloadIntervalMs", Long.toString(defaultReloadIntervalMs)); + + TransportConf module1TestConf = new TransportConf(module1, conf); + TransportConf module2TestConf = new TransportConf(module2, conf); + + assertEquals(module1Protocol, module1TestConf.sslProtocol()); + assertEquals(module2Protocol, module2TestConf.sslProtocol()); + + assertEquals(module1ReloadIntervalMs, module1TestConf.sslTrustStoreReloadIntervalMs()); + assertEquals(defaultReloadIntervalMs, module2TestConf.sslTrustStoreReloadIntervalMs()); + } +} diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 35e438b6b..a295d5353 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -53,4 +53,13 @@ license: | | celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | | +| celeborn.ssl.<module>.enabled | false | false | Enables SSL for securing wire traffic. | 0.5.0 | | +| celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false | A comma-separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 17 can be found at https://docs.oracle.com/en/java/javase/17/docs/specs/security/standard-names.html#jsse-cipher-suite-names . Note: If not set, the default cipher suite for the JRE will be used. | 0 [...] +| celeborn.ssl.<module>.keyStore | <undefined> | false | Path to the key store file. The path can be absolute or relative to the directory in which the process is started. | 0.5.0 | | +| celeborn.ssl.<module>.keyStorePassword | <undefined> | false | Password to the key store. | 0.5.0 | | +| celeborn.ssl.<module>.protocol | TLSv1.2 | false | SSL protocol to use | 0.5.0 | | +| celeborn.ssl.<module>.trustStore | <undefined> | false | Path to the trust store file. The path can be absolute or relative to the directory in which the process is started. | 0.5.0 | | +| celeborn.ssl.<module>.trustStorePassword | <undefined> | false | Password for the trust store. | 0.5.0 | | +| celeborn.ssl.<module>.trustStoreReloadIntervalMs | 10s | false | The interval at which the trust store should be reloaded (in milliseconds). This setting is mostly only useful for server components, not applications. | 0.5.0 | | +| celeborn.ssl.<module>.trustStoreReloadingEnabled | false | false | Whether the trust store should be reloaded periodically. This setting is mostly only useful for server components, not applications. | 0.5.0 | | <!--end-include-->
