This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e9dc2a4b3 [SPARK-45408][CORE] Add RPC SSL settings to TransportConf
f1e9dc2a4b3 is described below

commit f1e9dc2a4b31f597f7b72e6eda137e990c7b3980
Author: Hasnain Lakhani <[email protected]>
AuthorDate: Thu Oct 5 11:40:38 2023 -0500

    [SPARK-45408][CORE] Add RPC SSL settings to TransportConf
    
    ### What changes were proposed in this pull request?
    
    This change adds new settings to `TransportConf` which are needed for the 
RPC SSL functionality to work. Additionally, add some sample configurations 
which are used by tests in follow up PRs (see 
https://github.com/apache/spark/pull/42685 for the full context)
    
    ### Why are the changes needed?
    
    These changes are needed so that other modules can easily access 
configurations, and that the sample configurations are easily accessible for 
tests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a test, then ran:
    
    ```
    ./build/sbt
    > project network-common
    > testOnly org.apache.spark.network.TransportConfSuite
    ```
    
    There are more follow up tests coming (see 
https://github.com/apache/spark/pull/42685)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43220 from hasnain-db/spark-tls-configs-low.
    
    Authored-by: Hasnain Lakhani <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/network/util/TransportConf.java   | 152 +++++++++++++
 .../src/test/java/TransportConfSuite.java          |  88 ++++++++
 .../apache/spark/network/ssl/SslSampleConfigs.java | 235 +++++++++++++++++++++
 3 files changed, 475 insertions(+)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index b8d8f6b85a4..3ebb38e310f 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.util;
 
+import java.io.File;
 import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -257,6 +258,157 @@ public class TransportConf {
       conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k")));
   }
 
+  /**
+   * Whether Secure (SSL/TLS) RPC (including Block Transfer Service) is enabled
+   */
+  public boolean sslRpcEnabled() {
+    return conf.getBoolean("spark.ssl.rpc.enabled", false);
+  }
+
+  /**
+   * SSL protocol (remember that SSLv3 was compromised) supported by Java
+   */
+  public String sslRpcProtocol() {
+    return conf.get("spark.ssl.rpc.protocol", null);
+  }
+
+  /**
+   * A comma separated list of ciphers
+   */
+  public String[] sslRpcRequestedCiphers() {
+    String ciphers = conf.get("spark.ssl.rpc.enabledAlgorithms", null);
+    return (ciphers != null ? ciphers.split(",") : null);
+  }
+
+  /**
+   * The key-store file; can be relative to the current directory
+   */
+  public File sslRpcKeyStore() {
+    String keyStore = conf.get("spark.ssl.rpc.keyStore", null);
+    if (keyStore != null) {
+      return new File(keyStore);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * The password to the key-store file
+   */
+  public String sslRpcKeyStorePassword() {
+    return conf.get("spark.ssl.rpc.keyStorePassword", null);
+  }
+
+  /**
+   * A PKCS#8 private key file in PEM format; can be relative to the current 
directory
+   */
+  public File sslRpcPrivateKey() {
+    String privateKey = conf.get("spark.ssl.rpc.privateKey", null);
+    if (privateKey != null) {
+      return new File(privateKey);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * The password to the private key
+   */
+  public String sslRpcKeyPassword() {
+    return conf.get("spark.ssl.rpc.keyPassword", null);
+  }
+
+  /**
+   * A X.509 certificate chain file in PEM format; can be relative to the 
current directory
+   */
+  public File sslRpcCertChain() {
+    String certChain = conf.get("spark.ssl.rpc.certChain", null);
+    if (certChain != null) {
+      return new File(certChain);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * The trust-store file; can be relative to the current directory
+   */
+  public File sslRpcTrustStore() {
+    String trustStore = conf.get("spark.ssl.rpc.trustStore", null);
+    if (trustStore != null) {
+      return new File(trustStore);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * The password to the trust-store file
+   */
+  public String sslRpcTrustStorePassword() {
+    return conf.get("spark.ssl.rpc.trustStorePassword", null);
+  }
+
+  /**
+   * If using a trust-store that that reloads its configuration is enabled.
+   * If true, when the trust-store file on disk changes, it will be reloaded
+   */
+  public boolean sslRpcTrustStoreReloadingEnabled() {
+    return conf.getBoolean("spark.ssl.rpc.trustStoreReloadingEnabled", false);
+  }
+
+  /**
+   * The interval, in milliseconds, the trust-store will reload its 
configuration
+   */
+  public int sslRpctrustStoreReloadIntervalMs() {
+    return conf.getInt("spark.ssl.rpc.trustStoreReloadIntervalMs", 10000);
+  }
+
+  /**
+   * If the OpenSSL implementation is enabled,
+   * (if available on host system), requires certChain and keyFile arguments
+   */
+  public boolean sslRpcOpenSslEnabled() {
+    return conf.getBoolean("spark.ssl.rpc.openSslEnabled", false);
+  }
+
+  /**
+   *
+   * @return true if and only if RPC encryption is enabled and the relevant 
keys exist
+   */
+  public boolean sslRpcEnabledAndKeysAreValid() {
+    if (!sslRpcEnabled()) {
+      return false;
+    }
+    if (sslRpcOpenSslEnabled()) {
+      // OpenSSL requires both the privateKey and certChain
+      File privateKey = sslRpcPrivateKey();
+      if (privateKey == null || !privateKey.exists()) {
+        return false;
+      }
+      File certChain = sslRpcCertChain();
+      if (certChain == null || !certChain.exists()) {
+        return false;
+      }
+      return true;
+    } else {
+      File keyStore = sslRpcKeyStore();
+      if (keyStore == null || !keyStore.exists()) {
+        return false;
+      }
+      // It's fine for the trust store to be missing, we would default to 
trusting all.
+      return true;
+    }
+  }
+
+  /**
+   * If we can dangerously fallback to unencrypted connections if RPC over SSL 
is enabled
+   * but the key files are not present
+   */
+  public boolean sslRpcDangerouslyFallbackIfKeysNotPresent() {
+    return 
conf.getBoolean("spark.ssl.rpc.dangerouslyFallbackIfKeysNotPresent", false);
+  }
+
   /**
    * Flag indicating whether to share the pooled ByteBuf allocators between 
the different Netty
    * channels. If enabled then only two pooled ByteBuf allocators are created: 
one where caching
diff --git a/common/network-common/src/test/java/TransportConfSuite.java 
b/common/network-common/src/test/java/TransportConfSuite.java
new file mode 100644
index 00000000000..1537f67e98d
--- /dev/null
+++ b/common/network-common/src/test/java/TransportConfSuite.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network;
+
+import java.io.File;
+
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.ssl.SslSampleConfigs;
+
+public class TransportConfSuite {
+
+  private TransportConf transportConf =
+    new TransportConf(
+     "shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
+
+  @Test
+  public void testKeyStorePath() {
+    assertEquals(new File(SslSampleConfigs.keyStorePath), 
transportConf.sslRpcKeyStore());
+  }
+
+  @Test
+  public void testPrivateKeyPath() {
+    assertEquals(new File(SslSampleConfigs.privateKeyPath), 
transportConf.sslRpcPrivateKey());
+  }
+
+  @Test
+  public void testCertChainPath() {
+    assertEquals(new File(SslSampleConfigs.certChainPath), 
transportConf.sslRpcCertChain());
+  }
+
+  @Test
+  public void testTrustStorePath() {
+    assertEquals(new File(SslSampleConfigs.trustStorePath), 
transportConf.sslRpcTrustStore());
+  }
+
+  @Test
+  public void testTrustStoreReloadingEnabled() {
+    assertFalse(transportConf.sslRpcTrustStoreReloadingEnabled());
+  }
+
+  @Test
+  public void testOpenSslEnabled() {
+    assertFalse(transportConf.sslRpcOpenSslEnabled());
+  }
+
+  @Test
+  public void testSslRpcEnabled() {
+    assertTrue(transportConf.sslRpcEnabled());
+  }
+
+
+  @Test
+  public void testSslKeyStorePassword() {
+    assertEquals("password", transportConf.sslRpcKeyStorePassword());
+  }
+
+  @Test
+  public void testSslKeyPassword() {
+    assertEquals("password", transportConf.sslRpcKeyPassword());
+  }
+
+  @Test
+  public void testSslTrustStorePassword() {
+    assertEquals("password", transportConf.sslRpcTrustStorePassword());
+  }
+
+  @Test
+  public void testSsltrustStoreReloadIntervalMs() {
+    assertEquals(10000, transportConf.sslRpctrustStoreReloadIntervalMs());
+  }
+}
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
new file mode 100644
index 00000000000..3c81b0af318
--- /dev/null
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.GeneralSecurityException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.SignatureException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateEncodingException;
+import java.security.cert.X509Certificate;
+import java.util.*;
+
+import org.bouncycastle.x509.X509V1CertificateGenerator;
+
+import org.apache.spark.network.util.ConfigProvider;
+import org.apache.spark.network.util.MapConfigProvider;
+
+
+/**
+ *
+ */
+public class SslSampleConfigs {
+
+  public static final String keyStorePath = getAbsolutePath("/keystore");
+  public static final String privateKeyPath = getAbsolutePath("/key.pem");
+  public static final String certChainPath = getAbsolutePath("/certchain.pem");
+  public static final String untrustedKeyStorePath = 
getAbsolutePath("/untrusted-keystore");
+  public static final String trustStorePath = getAbsolutePath("/truststore");
+
+
+  /**
+   * Creates a config map containing the settings needed to enable the RPC SSL 
feature
+   * All the settings (except the enabled one) are intentionally set on the 
parent namespace
+   * so that we can verify settings inheritance works
+   */
+  public static Map<String, String> createDefaultConfigMap() {
+    Map<String, String> confMap = new HashMap<String, String>();
+    confMap.put("spark.ssl.rpc.enabled", "true");
+    // Need this so the other settings get parsed
+    confMap.put("spark.ssl.enabled", "true");
+    confMap.put("spark.ssl.trustStoreReloadingEnabled", "false");
+    confMap.put("spark.ssl.openSslEnabled", "false");
+    confMap.put("spark.ssl.trustStoreReloadIntervalMs", "10000");
+    confMap.put("spark.ssl.keyStore", SslSampleConfigs.keyStorePath);
+    confMap.put("spark.ssl.keyStorePassword", "password");
+    confMap.put("spark.ssl.privateKey", SslSampleConfigs.privateKeyPath);
+    confMap.put("spark.ssl.keyPassword", "password");
+    confMap.put("spark.ssl.certChain", SslSampleConfigs.certChainPath);
+    confMap.put("spark.ssl.trustStore", SslSampleConfigs.trustStorePath);
+    confMap.put("spark.ssl.trustStorePassword", "password");
+    return confMap;
+  }
+
+  /**
+   * Similar to the above, but sets the settings directly in the spark.ssl.rpc 
namespace
+   * This is needed for testing in the lower level modules (like 
network-common) where inheritance
+   * does not work as there is no access to SSLOptions.
+   */
+  public static Map<String, String> createDefaultConfigMapForRpcNamespace() {
+    Map<String, String> confMap = new HashMap<String, String>();
+    confMap.put("spark.ssl.rpc.enabled", "true");
+    confMap.put("spark.ssl.rpc.trustStoreReloadingEnabled", "false");
+    confMap.put("spark.ssl.rpc.openSslEnabled", "false");
+    confMap.put("spark.ssl.rpc.trustStoreReloadIntervalMs", "10000");
+    confMap.put("spark.ssl.rpc.keyStore", SslSampleConfigs.keyStorePath);
+    confMap.put("spark.ssl.rpc.keyStorePassword", "password");
+    confMap.put("spark.ssl.rpc.privateKey", SslSampleConfigs.privateKeyPath);
+    confMap.put("spark.ssl.rpc.keyPassword", "password");
+    confMap.put("spark.ssl.rpc.certChain", SslSampleConfigs.certChainPath);
+    confMap.put("spark.ssl.rpc.trustStore", SslSampleConfigs.trustStorePath);
+    confMap.put("spark.ssl.rpc.trustStorePassword", "password");
+    return confMap;
+  }
+
+  /**
+   * Create ConfigProvider based on the method above
+   */
+  public static ConfigProvider createDefaultConfigProviderForRpcNamespace() {
+    return new MapConfigProvider(createDefaultConfigMapForRpcNamespace());
+  }
+
+  /**
+   * Create ConfigProvider based on the method above
+   */
+  public static ConfigProvider 
createDefaultConfigProviderForRpcNamespaceWithAdditionalEntries(
+      Map<String, String> entries) {
+    Map<String, String> confMap = createDefaultConfigMapForRpcNamespace();
+    confMap.putAll(entries);
+    return new MapConfigProvider(confMap);
+  }
+
+  public static void createTrustStore(
+    File trustStore, String password, String alias, Certificate cert)
+    throws GeneralSecurityException, IOException {
+    KeyStore ks = createEmptyKeyStore();
+    ks.setCertificateEntry(alias, cert);
+    saveKeyStore(ks, trustStore, password);
+  }
+
+  /**
+   * Creates a keystore with multiple keys and saves it to a file.
+   */
+  public static <T extends Certificate> void createTrustStore(
+    File trustStore, String password, Map<String, T> certs)
+    throws GeneralSecurityException, IOException {
+    KeyStore ks = createEmptyKeyStore();
+    for (Map.Entry<String, T> cert : certs.entrySet()) {
+      ks.setCertificateEntry(cert.getKey(), cert.getValue());
+    }
+    saveKeyStore(ks, trustStore, password);
+  }
+
+  /**
+   * Create a self-signed X.509 Certificate.
+   *
+   * @param dn        the X.509 Distinguished Name, eg "CN=Test, L=London, 
C=GB"
+   * @param pair      the KeyPair
+   * @param days      how many days from now the Certificate is valid for
+   * @param algorithm the signing algorithm, eg "SHA1withRSA"
+   * @return the self-signed certificate
+   */
+  @SuppressWarnings("deprecation")
+  public static X509Certificate generateCertificate(
+      String dn, KeyPair pair, int days, String algorithm)
+      throws CertificateEncodingException, InvalidKeyException, 
IllegalStateException,
+      NoSuchAlgorithmException, SignatureException {
+
+    Date from = new Date();
+    Date to = new Date(from.getTime() + days * 86400000L);
+    BigInteger sn = new BigInteger(64, new SecureRandom());
+    KeyPair keyPair = pair;
+    X509V1CertificateGenerator certGen = new X509V1CertificateGenerator();
+    X500Principal dnName = new X500Principal(dn);
+
+    certGen.setSerialNumber(sn);
+    certGen.setIssuerDN(dnName);
+    certGen.setNotBefore(from);
+    certGen.setNotAfter(to);
+    certGen.setSubjectDN(dnName);
+    certGen.setPublicKey(keyPair.getPublic());
+    certGen.setSignatureAlgorithm(algorithm);
+
+    X509Certificate cert = certGen.generate(pair.getPrivate());
+    return cert;
+  }
+
+  public static KeyPair generateKeyPair(String algorithm)
+    throws NoSuchAlgorithmException {
+    KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
+    keyGen.initialize(1024);
+    return keyGen.genKeyPair();
+  }
+
+  /**
+   * Creates a keystore with a single key and saves it to a file.
+   *
+   * @param keyStore    File keystore to save
+   * @param password    String store password to set on keystore
+   * @param keyPassword String key password to set on key
+   * @param alias       String alias to use for the key
+   * @param privateKey  Key to save in keystore
+   * @param cert        Certificate to use as certificate chain associated to 
key
+   * @throws GeneralSecurityException for any error with the security APIs
+   * @throws IOException              if there is an I/O error saving the file
+   */
+  public static void createKeyStore(
+    File keyStore, String password, String keyPassword,
+    String alias, Key privateKey, Certificate cert)
+    throws GeneralSecurityException, IOException {
+    KeyStore ks = createEmptyKeyStore();
+    ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+      new Certificate[]{cert});
+    saveKeyStore(ks, keyStore, password);
+  }
+
+  public static void createKeyStore(
+    File keyStore, String password,
+    String alias, Key privateKey, Certificate cert)
+    throws GeneralSecurityException, IOException {
+    KeyStore ks = createEmptyKeyStore();
+    ks.setKeyEntry(alias, privateKey, password.toCharArray(), new 
Certificate[]{cert});
+    saveKeyStore(ks, keyStore, password);
+  }
+
+  private static KeyStore createEmptyKeyStore()
+    throws GeneralSecurityException, IOException {
+    KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+    ks.load(null, null); // initialize
+    return ks;
+  }
+
+  private static void saveKeyStore(
+    KeyStore ks, File keyStore, String password)
+    throws GeneralSecurityException, IOException {
+    FileOutputStream out = new FileOutputStream(keyStore);
+    try {
+      ks.store(out, password.toCharArray());
+    } finally {
+      out.close();
+    }
+  }
+
+  public static String getAbsolutePath(String path) {
+    try {
+      return new 
File(SslSampleConfigs.class.getResource(path).getFile()).getCanonicalPath();
+    } catch (IOException e) {
+       throw new RuntimeException("Failed to resolve path " + path, e);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to