This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 343df6aeba HDDS-8591. Create scheduler to check for new root ca 
certificates (#4961)
343df6aeba is described below

commit 343df6aebaf52086ea2c82cfdb513e2422fc8f36
Author: Galsza <[email protected]>
AuthorDate: Thu Jul 6 11:57:54 2023 +0200

    HDDS-8591. Create scheduler to check for new root ca certificates (#4961)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   4 +
 .../hadoop/hdds/security/SecurityConfig.java       |  14 ++
 .../common/src/main/resources/ozone-default.xml    |  11 ++
 .../certificate/client/RootCaRotationPoller.java   | 143 +++++++++++++++++++++
 .../utils/TestRootCaRotationPoller.java            | 130 +++++++++++++++++++
 5 files changed, 302 insertions(+)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index ac6c08867b..e1edd94553 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -222,6 +222,10 @@ public final class HddsConfigKeys {
       "hdds.x509.ca.rotation.ack.timeout";
   public static final String HDDS_X509_CA_ROTATION_ACK_TIMEOUT_DEFAULT =
       "PT15M";
+  public static final String HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL =
+      "hdds.x509.rootca.certificate.polling.interval";
+  public static final String
+      HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT = "PT2h";
 
   public static final String HDDS_CONTAINER_REPLICATION_COMPRESSION =
       "hdds.container.replication.compression";
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/SecurityConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/SecurityConfig.java
index f3e747de63..d2bd588d09 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/SecurityConfig.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/SecurityConfig.java
@@ -52,6 +52,8 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_TIME_O
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_CA_ROTATION_TIME_OF_DAY_DEFAULT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_FILE;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_FILE_DEFAULT;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL;
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PRIVATE_KEY_FILE;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PRIVATE_KEY_FILE_DEFAULT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_PUBLIC_KEY_FILE;
@@ -131,6 +133,7 @@ public class SecurityConfig {
       Pattern.compile("\\d{2}:\\d{2}:\\d{2}");
   private final Duration caAckTimeout;
   private final SslProvider grpcSSLProvider;
+  private final Duration rootCaCertificatePollingInterval;
 
   /**
    * Constructs a SecurityConfig.
@@ -228,6 +231,13 @@ public class SecurityConfig {
 
     validateCertificateValidityConfig();
 
+    String rootCaCertificatePollingIntervalString = configuration.get(
+        HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL,
+        HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL_DEFAULT);
+
+    this.rootCaCertificatePollingInterval =
+        Duration.parse(rootCaCertificatePollingIntervalString);
+
     this.externalRootCaCert = configuration.get(
         HDDS_X509_ROOTCA_CERTIFICATE_FILE,
         HDDS_X509_ROOTCA_CERTIFICATE_FILE_DEFAULT);
@@ -552,6 +562,10 @@ public class SecurityConfig {
     return caAckTimeout;
   }
 
+  public Duration getRootCaCertificatePollingInterval() {
+    return rootCaCertificatePollingInterval;
+  }
+
   /**
    * Return true if using test certificates with authority as localhost. This
    * should be used only for unit test where certificates are generated by
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 484e5bfd3c..ce65be861d 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2268,6 +2268,17 @@
       is failed. Default is 15 minutes.
     </description>
   </property>
+  <property>
+    <name>hdds.x509.rootca.certificate.polling.interval</name>
+    <value>PT2h</value>
+    <description>Interval to use for polling in certificate clients for a new
+      root ca certificate. Every time the specified time duration elapses,
+      the clients send a request to the SCMs to see if a new root ca
+      certificate was generated. Once there is a change, the system
+      automatically adds the new root ca to the clients'
+      trust stores and requests a new certificate to be signed.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.security.handler.count.key</name>
     <value>2</value>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java
new file mode 100644
index 0000000000..47cc368bbe
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/RootCaRotationPoller.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.x509.certificate.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Poller mechanism for Root Ca Rotation for clients.
+ */
+public class RootCaRotationPoller implements Runnable, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RootCaRotationPoller.class);
+  private final List<Function<List<X509Certificate>, CompletableFuture<Void>>>
+      rootCARotationProcessors;
+  private final ScheduledExecutorService poller;
+  private final Duration pollingInterval;
+  private Set<X509Certificate> knownRootCerts;
+  private final SCMSecurityProtocolClientSideTranslatorPB scmSecureClient;
+
+  public RootCaRotationPoller(SecurityConfig securityConfig,
+      Set<X509Certificate> initiallyKnownRootCaCerts,
+      SCMSecurityProtocolClientSideTranslatorPB scmSecureClient) {
+    this.scmSecureClient = scmSecureClient;
+    this.knownRootCerts = initiallyKnownRootCaCerts;
+    poller = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat(
+                this.getClass().getSimpleName())
+            .setDaemon(true).build());
+    pollingInterval = securityConfig.getRootCaCertificatePollingInterval();
+    rootCARotationProcessors = new ArrayList<>();
+  }
+
+  private void pollRootCas() {
+    try {
+      List<String> pemEncodedRootCaList =
+          scmSecureClient.getAllRootCaCertificates();
+      List<X509Certificate> rootCAsFromSCM =
+          OzoneSecurityUtil.convertToX509(pemEncodedRootCaList);
+      List<X509Certificate> scmCertsWithoutKnownCerts
+          = new ArrayList<>(rootCAsFromSCM);
+      scmCertsWithoutKnownCerts.removeAll(knownRootCerts);
+      if (scmCertsWithoutKnownCerts.isEmpty()) {
+        return;
+      }
+      LOG.info("Some root CAs are not known to the client out of the root " +
+          "CAs known to the SCMs. Root CA Cert ids known to the client: " +
+          getPrintableCertIds(knownRootCerts) + ". Root CA Cert ids from " +
+          "SCM not known by the client: " +
+          getPrintableCertIds(scmCertsWithoutKnownCerts));
+
+      CompletableFuture<Void> allRootCAProcessorFutures =
+          CompletableFuture.allOf(rootCARotationProcessors.stream()
+              .map(c -> c.apply(rootCAsFromSCM))
+              .toArray(CompletableFuture[]::new));
+
+      allRootCAProcessorFutures.whenComplete((unused, throwable) -> {
+        if (throwable == null) {
+          knownRootCerts = new HashSet<>(rootCAsFromSCM);
+        }
+      });
+    } catch (IOException e) {
+      LOG.error("Error while trying to poll root ca certificate", e);
+    }
+  }
+
+  public void addRootCARotationProcessor(
+      Function<List<X509Certificate>, CompletableFuture<Void>> processor) {
+    rootCARotationProcessors.add(processor);
+  }
+
+  @Override
+  public void run() {
+    poller.scheduleAtFixedRate(this::pollRootCas, 0,
+        pollingInterval.getSeconds(), TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void close() {
+    executorServiceShutdownGraceful(poller);
+  }
+
+  private void executorServiceShutdownGraceful(ExecutorService executor) {
+    executor.shutdown();
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+      }
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn("{} couldn't be shut down gracefully",
+            getClass().getSimpleName());
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("{} couldn't be stopped gracefully", 
getClass().getSimpleName());
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private String getPrintableCertIds(Collection<X509Certificate> certs) {
+    return certs.stream()
+        .map(X509Certificate::getSerialNumber)
+        .map(BigInteger::toString)
+        .collect(Collectors.joining(", "));
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java
new file mode 100644
index 0000000000..7f2ed0a813
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCaRotationPoller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.security.x509.certificate.utils;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.security.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.RootCaRotationPoller;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.ozone.test.GenericTestUtils;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL;
+
+/**
+ * Test for Root Ca Rotation polling mechanism on client side.
+ */
+public class TestRootCaRotationPoller {
+
+  private SecurityConfig secConf;
+
+  @Mock
+  private SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient;
+
+  @BeforeEach
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(HDDS_X509_ROOTCA_CERTIFICATE_POLLING_INTERVAL, "PT1s");
+    secConf = new SecurityConfig(conf);
+  }
+
+  @Test
+  public void testPollerDoesNotInvokeRootCaProcessor() throws Exception {
+    X509Certificate knownCert = generateX509Cert(
+        LocalDateTime.now(), Duration.ofSeconds(50));
+    HashSet<X509Certificate> knownCerts = new HashSet<>();
+    knownCerts.add(knownCert);
+    List<String> certsFromScm = new ArrayList<>();
+    certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
+    RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
+        knownCerts, scmSecurityClient);
+
+    Mockito.when(scmSecurityClient.getAllRootCaCertificates())
+        .thenReturn(certsFromScm);
+    AtomicBoolean atomicBoolean = new AtomicBoolean();
+    atomicBoolean.set(false);
+    poller.addRootCARotationProcessor(
+        certificates -> CompletableFuture.supplyAsync(() -> {
+          atomicBoolean.set(true);
+          Assertions.assertEquals(certificates.size(), 2);
+          return null;
+        }));
+    poller.run();
+    Assertions.assertThrows(TimeoutException.class, () ->
+        GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000));
+  }
+
+  @Test
+  public void testPollerInvokesRootCaProcessors() throws Exception {
+    X509Certificate knownCert = generateX509Cert(
+        LocalDateTime.now(), Duration.ofSeconds(50));
+    X509Certificate newRootCa = generateX509Cert(
+        LocalDateTime.now(), Duration.ofSeconds(50));
+    HashSet<X509Certificate> knownCerts = new HashSet<>();
+    knownCerts.add(knownCert);
+    List<String> certsFromScm = new ArrayList<>();
+    certsFromScm.add(CertificateCodec.getPEMEncodedString(knownCert));
+    certsFromScm.add(CertificateCodec.getPEMEncodedString(newRootCa));
+    RootCaRotationPoller poller = new RootCaRotationPoller(secConf,
+        knownCerts, scmSecurityClient);
+    poller.run();
+    Mockito.when(scmSecurityClient.getAllRootCaCertificates())
+        .thenReturn(certsFromScm);
+    AtomicBoolean atomicBoolean = new AtomicBoolean();
+    atomicBoolean.set(false);
+    poller.addRootCARotationProcessor(
+        certificates -> CompletableFuture.supplyAsync(() -> {
+          atomicBoolean.set(true);
+          Assertions.assertEquals(certificates.size(), 2);
+          return null;
+        }));
+    GenericTestUtils.waitFor(atomicBoolean::get, 50, 5000);
+  }
+
+  private X509Certificate generateX509Cert(
+      LocalDateTime startDate, Duration certLifetime) throws Exception {
+    KeyPair keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+    LocalDateTime start = startDate == null ? LocalDateTime.now() : startDate;
+    LocalDateTime end = start.plus(certLifetime);
+    return new JcaX509CertificateConverter().getCertificate(
+        SelfSignedCertificate.newBuilder().setBeginDate(start)
+            .setEndDate(end).setClusterID("cluster").setKey(keyPair)
+            .setSubject("localhost").setConfiguration(secConf).setScmID("test")
+            .build());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to