This is an automated email from the ASF dual-hosted git repository.
vivekratnavel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 25d66d7 HDDS-5154. Add SCM GRPC server to publish CRL update. (#2216)
25d66d7 is described below
commit 25d66d72761820fc63698493326edc993213853c
Author: Xiaoyu Yao <[email protected]>
AuthorDate: Wed Jun 2 16:44:55 2021 -0700
HDDS-5154. Add SCM GRPC server to publish CRL update. (#2216)
---
.github/workflows/post-commit.yml | 2 +-
.../hadoop/hdds/scm/exceptions/SCMException.java | 4 +-
.../hadoop/hdds/security/x509/crl/CRLInfo.java | 39 +++
.../scm/update/client/CRLClientUpdateHandler.java | 201 ++++++++++++++
.../hadoop/hdds/scm/update/client/CRLStore.java | 34 +++
.../hdds/scm/update/client/ClientCRLStore.java | 98 +++++++
.../scm/update/client/ClientUpdateHandler.java | 29 ++
.../client/SCMUpdateClientConfiguration.java | 53 ++++
.../update/client/SCMUpdateServiceGrpcClient.java | 218 +++++++++++++++
.../scm/update/client/UpdateServiceConfig.java | 34 +--
.../hdds/scm/update/client/package-info.java | 22 ++
.../hdds/scm/update/server/CRLClientInfo.java | 56 ++++
.../scm/update/server/SCMUpdateClientInfo.java | 67 +++++
.../hdds/scm/update/server/package-info.java | 22 ++
.../x509/certificate/client/CertificateClient.java | 26 +-
.../client/DefaultCertificateClient.java | 65 +++++
.../certificate/client/OMCertificateClient.java | 11 +-
hadoop-hdds/interface-client/pom.xml | 2 +
.../src/main/proto/SCMUpdateProtocol.proto | 98 +++++++
.../src/main/proto/ScmServerProtocol.proto | 2 +
.../hdds/scm/server/SCMSecurityProtocolServer.java | 25 +-
.../hadoop/hdds/scm/update/server/SCMCRLStore.java | 51 ++++
.../scm/update/server/SCMCRLUpdateHandler.java | 151 +++++++++++
.../scm/update/server/SCMUpdateClientManager.java | 149 ++++++++++
.../hdds/scm/update/server/SCMUpdateHandler.java | 53 ++++
.../update/server/SCMUpdateServiceGrpcServer.java | 99 +++++++
.../scm/update/server/SCMUpdateServiceImpl.java | 119 ++++++++
.../hdds/scm/update/server/package-info.java | 22 ++
.../scm/server/TestSCMSecurityProtocolServer.java | 4 +-
.../hdds/scm/update/server/MockCRLStore.java | 138 ++++++++++
.../server/TestSCMUpdateServiceGrpcServer.java | 301 +++++++++++++++++++++
.../ozone/client/CertificateClientTestImpl.java | 14 +
.../java/org/apache/hadoop/ozone/om/ScmClient.java | 11 +
.../ozone/recon/scm/TestReconNodeManager.java | 2 +-
34 files changed, 2194 insertions(+), 28 deletions(-)
diff --git a/.github/workflows/post-commit.yml
b/.github/workflows/post-commit.yml
index 65d4a76..6dae941 100644
--- a/.github/workflows/post-commit.yml
+++ b/.github/workflows/post-commit.yml
@@ -227,7 +227,7 @@ jobs:
continue-on-error: true
integration:
runs-on: ubuntu-18.04
- timeout-minutes: 120
+ timeout-minutes: 150
if: github.event_name != 'pull_request' || github.event.pull_request.draft
== false
strategy:
matrix:
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 20185cd..cd1ea08 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -130,6 +130,8 @@ public class SCMException extends IOException {
PIPELINE_NOT_FOUND,
UNKNOWN_PIPELINE_STATE,
CONTAINER_NOT_FOUND,
- CONTAINER_REPLICA_NOT_FOUND
+ CONTAINER_REPLICA_NOT_FOUND,
+ FAILED_TO_CONNECT_TO_CRL_SERVICE,
+ FAILED_TO_ADD_CRL_CLIENT,
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
index 633cbcb..5a9fba6 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.security.x509.crl;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.jetbrains.annotations.NotNull;
@@ -26,6 +27,8 @@ import java.io.IOException;
import java.security.cert.CRLException;
import java.security.cert.CertificateException;
import java.security.cert.X509CRL;
+import java.security.cert.X509CRLEntry;
+import java.time.Instant;
import java.util.Comparator;
import java.util.Objects;
@@ -38,11 +41,17 @@ public class CRLInfo implements Comparator<CRLInfo>,
private X509CRL x509CRL;
private long creationTimestamp;
private long crlSequenceID;
+ private Instant revocationTime;
private CRLInfo(X509CRL x509CRL, long creationTimestamp, long crlSequenceID)
{
+ assert((x509CRL != null) &&
+ !x509CRL.getRevokedCertificates().isEmpty());
this.x509CRL = x509CRL;
this.creationTimestamp = creationTimestamp;
this.crlSequenceID = crlSequenceID;
+ X509CRLEntry entry = x509CRL.getRevokedCertificates().iterator().next();
+ this.revocationTime = Instant.ofEpochMilli(
+ entry.getRevocationDate().getTime());
}
/**
@@ -71,6 +80,28 @@ public class CRLInfo implements Comparator<CRLInfo>,
.build();
}
+ public static CRLInfo fromCRLProto3(
+ SCMUpdateServiceProtos.CRLInfoProto info)
+ throws IOException, CRLException, CertificateException {
+ CRLInfo.Builder builder = new CRLInfo.Builder();
+ return builder
+ .setX509CRL(CRLCodec.getX509CRL(info.getX509CRL()))
+ .setCreationTimestamp(info.getCreationTimestamp())
+ .setCrlSequenceID(info.getCrlSequenceID())
+ .build();
+ }
+
+ public SCMUpdateServiceProtos.CRLInfoProto getCRLProto3()
+ throws SCMSecurityException {
+ SCMUpdateServiceProtos.CRLInfoProto.Builder builder =
+ SCMUpdateServiceProtos.CRLInfoProto.newBuilder();
+
+ return builder.setX509CRL(CRLCodec.getPEMEncodedString(getX509CRL()))
+ .setCreationTimestamp(getCreationTimestamp())
+ .setCrlSequenceID(getCrlSequenceID())
+ .build();
+ }
+
public X509CRL getX509CRL() {
return x509CRL;
}
@@ -83,6 +114,14 @@ public class CRLInfo implements Comparator<CRLInfo>,
return crlSequenceID;
}
+ public boolean shouldRevokeNow() {
+ return revocationTime.isBefore(Instant.now());
+ }
+
+ public Instant getRevocationTime() {
+ return revocationTime;
+ }
+
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
new file mode 100644
index 0000000..df219e0
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CRL client update handler that handles local CRL update and pending CRLs.
+ */
+public class CRLClientUpdateHandler implements ClientUpdateHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CRLClientUpdateHandler.class);
+ private static final String NAME = "CRLClientUpdateHandler";
+
+ private final SCMUpdateServiceStub updateStub;
+ private final ClientCRLStore clientStore;
+
+ // Used to update server about local pending crl id list
+ private StreamObserver<UpdateRequest> requestObserver;
+ private UUID clientUuid;
+ private SCMUpdateServiceProtos.ClientId clientIdProto;
+
+ // periodically process pending crls
+ private ScheduledExecutorService executorService;
+ private final SCMUpdateServiceGrpcClient serviceGrpcClient;
+ private long crlCheckInterval;
+
+ CRLClientUpdateHandler(UUID clientId,
+ SCMUpdateServiceStub updateStub,
+ SCMUpdateServiceGrpcClient serviceGrpcClient,
+ long crlCheckInterval) {
+ this.clientUuid = clientId;
+ this.updateStub = updateStub;
+ this.serviceGrpcClient = serviceGrpcClient;
+
+ this.clientStore = serviceGrpcClient.getClientCRLStore();
+ this.crlCheckInterval = crlCheckInterval;
+ LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000);
+ this.executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("CRLUpdateHandler Thread - %d").build());
+ }
+
+ public static Logger getLog() {
+ return LOG;
+ }
+
+ @Override
+ public void handleServerUpdate(UpdateResponse updateResponse) {
+ SCMUpdateServiceProtos.CRLInfoProto crlInfo =
+ updateResponse.getCrlUpdateResponse().getCrlInfo();
+
+ long receivedCrlId = crlInfo.getCrlSequenceID();
+ long localCrlId = clientStore.getLatestCrlId();
+
+ LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}",
+ clientUuid, localCrlId, receivedCrlId);
+ if (localCrlId == receivedCrlId) {
+ return;
+ }
+ // send a client update to refresh stale server
+ if (localCrlId > receivedCrlId) {
+ LOG.warn("Received stale crlId {} lower than client crlId {}",
+ receivedCrlId, localCrlId);
+ sendClientUpdate();
+ return;
+ }
+
+ CRLInfo crl;
+ try {
+ crl = CRLInfo.fromCRLProto3(crlInfo);
+ } catch (Exception e) {
+ LOG.error("Can't parse server CRL update, skip...", e);
+ return;
+ }
+ clientStore.onRevokeCerts(crl);
+ // send client update.
+ sendClientUpdate();
+ }
+
+ public void start() {
+ // send initial update request to get a request observer handle
+ UpdateRequest updateReq = getUpdateRequest();
+ requestObserver = updateStub.withWaitForReady()
+ .updateStatus(new StreamObserver<UpdateResponse>() {
+ @Override
+ public void onNext(UpdateResponse updateResponse) {
+ LOG.debug("Receive server response: {}",
updateResponse.toString());
+ serviceGrpcClient.incrUpdateCount();
+ handleServerUpdate(updateResponse);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.debug("Receive server error ", throwable);
+ serviceGrpcClient.incrErrorCount();
+ if (serviceGrpcClient.getIsRunning().get()) {
+ // TODO: not all server error needs client restart.
+ LOG.warn("Restart client on server error: ", throwable);
+ serviceGrpcClient.restart();
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.debug("Receive server completed");
+ }
+ });
+ requestObserver.onNext(updateReq);
+ startPendingCrlChecker();
+ }
+
+ public void stop() {
+ stopPendingCrlCheck();
+ }
+
+ private void stopPendingCrlCheck() {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for executor service" +
+ " to shutdown", e);
+ }
+ }
+
+ private void startPendingCrlChecker() {
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ // background thread handle pending crl and update server
+
+ CRLInfo crl = null;
+ while ((crl = clientStore.getNextPendingCrl()) != null) {
+ if (crl.shouldRevokeNow()) {
+ serviceGrpcClient.incrPendingCrlRemoveCount();
+ LOG.info("Time to process crlId {}", crl.getCrlSequenceID());
+ clientStore.removePendingCrl(crl);
+ sendClientUpdate();
+ } else {
+ // we are done with this pending Crl, wait for next round
+ break;
+ }
+ }
+ }
+ }, 0, crlCheckInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void sendClientUpdate() {
+ requestObserver.onNext(getUpdateRequest());
+ }
+
+ private UpdateRequest getUpdateRequest() {
+ return UpdateRequest.newBuilder()
+ .setUpdateType(SCMUpdateServiceProtos.Type.CRLUpdate)
+ .setClientId(SCMUpdateClientInfo.toClientIdProto(clientUuid))
+ .setCrlUpdateRequest(getCrlUpdateRequest())
+ .build();
+ }
+
+ private CRLUpdateRequest getCrlUpdateRequest() {
+ List<Long> pendingCrlIds = clientStore.getPendingCrlIds();
+ return CRLUpdateRequest.newBuilder()
+ .setReceivedCrlId(clientStore.getLatestCrlId())
+ .addAllPendingCrlIds(pendingCrlIds)
+ .build();
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
new file mode 100644
index 0000000..0b98117
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+
+/**
+ * CRL Store interface.
+ */
+public interface CRLStore {
+
+ long getLatestCrlId();
+
+ CRLInfo getCRL(long crlId) throws IOException;
+
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
new file mode 100644
index 0000000..721988e
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+import java.security.cert.X509CRL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+/**
+ * In memory Client CRL store, need to integrate with Client Table.
+ */
+public class ClientCRLStore implements CRLStore {
+ private PriorityQueue<CRLInfo> pendingCrls;
+ private List<Long> revokedCerts;
+ private long localCrlId;
+
+ public ClientCRLStore() {
+ localCrlId = 0;
+ revokedCerts = new ArrayList<>();
+ pendingCrls = new PriorityQueue<>(
+ new Comparator<CRLInfo>() {
+ @Override
+ public int compare(CRLInfo o1, CRLInfo o2) {
+ return o1.getRevocationTime()
+ .compareTo(o2.getRevocationTime());
+ }
+ });
+ }
+
+ @Override
+ public long getLatestCrlId() {
+ return localCrlId;
+ }
+
+ public void setLocalCrlId(long crlId) {
+ localCrlId = crlId;
+ }
+
+
+ @Override
+ public CRLInfo getCRL(long crlId) throws IOException {
+ return null;
+ }
+
+ public void onRevokeCerts(CRLInfo crl) {
+ if (crl.shouldRevokeNow()) {
+ revokedCerts.addAll(getRevokedCertIds(crl.getX509CRL()));
+ } else {
+ pendingCrls.add(crl);
+ }
+ localCrlId = crl.getCrlSequenceID();
+ }
+
+ public List<Long> getRevokedCertIds(X509CRL crl) {
+ return Collections.unmodifiableList(crl.getRevokedCertificates().stream()
+ .map(cert->cert.getSerialNumber().longValue())
+ .collect(Collectors.toList()));
+ }
+
+ public CRLInfo getNextPendingCrl() {
+ return pendingCrls.peek();
+ }
+
+ public void removePendingCrl(CRLInfo crl) {
+ pendingCrls.remove(crl);
+ revokedCerts.addAll(getRevokedCertIds(crl.getX509CRL()));
+ }
+
+ public List<Long> getPendingCrlIds() {
+ return new ArrayList<>(pendingCrls)
+ .stream().map(crl->crl.getCrlSequenceID())
+ .collect(Collectors.toList());
+ }
+
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
new file mode 100644
index 0000000..c16e438
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+
+/**
+ * Interface used by client side to handle server updates.
+ */
+public interface ClientUpdateHandler {
+
+ void handleServerUpdate(UpdateResponse updateResponse);
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
new file mode 100644
index 0000000..84fdc1f
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+
+import java.time.Duration;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SECURITY;
+
+/**
+ * Configuration used by SCM CRL update client.
+ */
+@ConfigGroup(prefix = "ozone.scm.update")
+public class SCMUpdateClientConfiguration {
+ @Config(key = "client.crl.check.interval",
+ type = ConfigType.TIME,
+ defaultValue = "600s",
+ tags = {SCM, OZONE, SECURITY},
+ description = "The interval that the scm update service client use to" +
+ "check its pending CRLs."
+ )
+ private long clientCrlCheckIntervalInMs =
+ Duration.ofMinutes(10).toMillis();
+
+ public long getClientCrlCheckInterval() {
+ return clientCrlCheckIntervalInMs;
+ }
+
+ public void setClientCrlCheckInterval(Duration interval) {
+ this.clientCrlCheckIntervalInMs = interval.toMillis();
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
new file mode 100644
index 0000000..c69c229
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeResponse;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.Deadline;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class for SCM Update Service Grpc Client.
+ */
+public class SCMUpdateServiceGrpcClient {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMUpdateServiceGrpcClient.class);
+ private static final String CLIENT_NAME = "SCMUpdateServiceGrpcClient";
+
+ private ManagedChannel channel;
+ private SCMUpdateServiceGrpc.SCMUpdateServiceStub updateClient;
+ private SCMUpdateServiceGrpc.SCMUpdateServiceBlockingStub subscribeClient;
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ private UUID clientId = null;
+ private StreamObserver<UpdateRequest> requestObserver;
+ private CRLClientUpdateHandler handler;
+ private long crlCheckInterval;
+ private final String host;
+ private final int port;
+ private final ClientCRLStore clientCRLStore;
+ private AtomicLong updateCount;
+ private AtomicLong errorCount;
+ private AtomicLong pendingCrlRemoveCount;
+
+
+ public SCMUpdateServiceGrpcClient(final String host,
+ final ConfigurationSource conf,
+ ClientCRLStore clientCRLStore) {
+ Preconditions.checkNotNull(conf);
+ this.host = host;
+ this.port = conf.getObject(UpdateServiceConfig.class).getPort();
+ this.crlCheckInterval = conf.getObject(SCMUpdateClientConfiguration.class)
+ .getClientCrlCheckInterval();
+
+ this.clientCRLStore = clientCRLStore;
+ createChannel();
+ updateCount = new AtomicLong();
+ errorCount = new AtomicLong();
+ pendingCrlRemoveCount = new AtomicLong();
+ }
+
+ public void start() {
+ if (!isRunning.compareAndSet(false, true)) {
+ LOG.info("Ignore. already started.");
+ return;
+ }
+
+ LOG.info("{}: starting...", CLIENT_NAME);
+ if (channel == null) {
+ createChannel();
+ }
+ clientId = subScribeClient();
+ assert(clientId != null);
+
+ // start background thread processing pending crl ids.
+ handler = new CRLClientUpdateHandler(clientId, updateClient,
+ this, crlCheckInterval);
+ handler.start();
+
+ LOG.info("{}: started.", CLIENT_NAME);
+ }
+
+ public void incrUpdateCount() {
+ updateCount.incrementAndGet();
+ }
+
+ public void incrErrorCount() {
+ errorCount.incrementAndGet();
+ }
+
+ public void incrPendingCrlRemoveCount() {
+ pendingCrlRemoveCount.incrementAndGet();
+ }
+
+ @VisibleForTesting
+ public long getUpdateCount() {
+ return updateCount.get();
+ }
+
+ @VisibleForTesting
+ public long getErrorCount() {
+ return errorCount.get();
+ }
+
+ @VisibleForTesting
+ public long getPendingCrlRemoveCount() {
+ return pendingCrlRemoveCount.get();
+ }
+
+ public ClientCRLStore getClientCRLStore() {
+ return clientCRLStore;
+ }
+ public AtomicBoolean getIsRunning() {
+ return isRunning;
+ }
+
+ public void stop(boolean shutdown) {
+ LOG.info("{}: stopping...", CLIENT_NAME);
+ if (isRunning.get()) {
+ // complete update request, no more client streaming
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ requestObserver = null;
+ }
+
+ // stop update handler
+ if (handler != null) {
+ handler.stop();
+ handler = null;
+ }
+
+ if (shutdown) {
+ shutdownChannel();
+ }
+ isRunning.set(false);
+ }
+ LOG.info("{}: stopped.", CLIENT_NAME);
+ }
+
+ public void restart() {
+ resetClient();
+ stop(false);
+ start();
+ }
+
+ public void createChannel() {
+ NettyChannelBuilder channelBuilder =
+ NettyChannelBuilder.forAddress(host, port).usePlaintext()
+ .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+ channel = channelBuilder.build();
+ updateClient = SCMUpdateServiceGrpc.newStub(channel);
+ subscribeClient = SCMUpdateServiceGrpc.newBlockingStub(channel);
+ }
+
+ public void shutdownChannel() {
+ if (channel == null) {
+ return;
+ }
+
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to shutdown {} channel", CLIENT_NAME, e);
+ } finally {
+ channel.shutdownNow();
+ channel = null;
+ }
+ }
+
+ private UUID subScribeClient() {
+ SubscribeRequest subReq = SubscribeRequest.newBuilder().build();
+ SubscribeResponse subResp = subscribeClient.withWaitForReady()
+ .subscribe(subReq);
+ return SCMUpdateClientInfo.fromClientIdProto(subResp.getClientId());
+ }
+
+ private void unSubscribeClient() {
+ if (clientId != null) {
+ UnsubscribeRequest unsubReq = UnsubscribeRequest.newBuilder()
+
.setClientId(SCMUpdateClientInfo.toClientIdProto(clientId)).build();
+ subscribeClient.withWaitForReady().
+ withDeadline(Deadline.after(5, TimeUnit.MILLISECONDS))
+ .unsubscribe(unsubReq);
+ }
+ }
+
+ // short-circuit the backoff timer and make them reconnect immediately.
+ private void resetClient() {
+ if (channel == null) {
+ return;
+ }
+ channel.resetConnectBackoff();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
similarity index 52%
copy from
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
copy to
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
index cce9d04..9f55c4d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
@@ -15,30 +15,30 @@
* the License.
*/
-package org.apache.hadoop.ozone.om;
+package org.apache.hadoop.hdds.scm.update.client;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
/**
- * Wrapper class for Scm protocol clients.
+ * Update service configuration.
*/
-public class ScmClient {
+@ConfigGroup(prefix = "ozone.scm.update.service")
+public final class UpdateServiceConfig {
- private final ScmBlockLocationProtocol blockClient;
- private final StorageContainerLocationProtocol containerClient;
+ @Config(key = "port", defaultValue = "9893", description = "Port used for"
+ + " the SCM grpc update service for CRL.", tags = {
+ ConfigTag.SECURITY})
+ private int port;
- ScmClient(ScmBlockLocationProtocol blockClient,
- StorageContainerLocationProtocol containerClient) {
- this.containerClient = containerClient;
- this.blockClient = blockClient;
+ public int getPort() {
+ return port;
}
- public ScmBlockLocationProtocol getBlockClient() {
- return this.blockClient;
- }
-
- public StorageContainerLocationProtocol getContainerClient() {
- return this.containerClient;
+ public UpdateServiceConfig setPort(
+ int portParam) {
+ this.port = portParam;
+ return this;
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
new file mode 100644
index 0000000..6fae144
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL client package.
+ */
+package org.apache.hadoop.hdds.scm.update.client;
\ No newline at end of file
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
new file mode 100644
index 0000000..33386db
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class wrap CRL client info on server side.
+ */
+public class CRLClientInfo {
+ private final SCMUpdateClientInfo updateClientInfo;
+ private long receivedCrlId;
+ private List<Long> pendingCrlIds;
+
+ public CRLClientInfo(SCMUpdateClientInfo clientInfo) {
+ this.updateClientInfo = clientInfo;
+ }
+
+ public long getReceivedCrlId() {
+ return receivedCrlId;
+ }
+
+ public void setReceivedCrlId(long receivedCrlId) {
+ this.receivedCrlId = receivedCrlId;
+ }
+
+ public List<Long> getPendingCrlIds() {
+ return Collections.unmodifiableList(pendingCrlIds);
+ }
+
+ public void setPendingCrlIds(List<Long> pendingCrlIds) {
+ this.pendingCrlIds = new ArrayList<>(pendingCrlIds);
+ }
+
+ public SCMUpdateClientInfo getUpdateClientInfo() {
+ return updateClientInfo;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
new file mode 100644
index 0000000..f03ae84
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+import java.util.UUID;
+
+/**
+ * Wrapper class for scm update client on server side.
+ */
+public class SCMUpdateClientInfo {
+ private StreamObserver<UpdateResponse> responseObserver;
+ private UUID clientId;
+
+ public SCMUpdateClientInfo(UUID clientId) {
+ this(clientId, null);
+ }
+
+ public SCMUpdateClientInfo(UUID clientId,
+ StreamObserver<UpdateResponse> responseObserver) {
+ this.clientId = clientId;
+ this.responseObserver = responseObserver;
+ }
+
+ public UUID getClientId() {
+ return clientId;
+ }
+
+ public static SCMUpdateServiceProtos.ClientId toClientIdProto(UUID uuid) {
+ return SCMUpdateServiceProtos.ClientId.newBuilder()
+ .setLsb(uuid.getLeastSignificantBits())
+ .setMsb(uuid.getMostSignificantBits()).build();
+ }
+
+ public static UUID fromClientIdProto(
+ SCMUpdateServiceProtos.ClientId clientId) {
+ return new UUID(clientId.getMsb(), clientId.getLsb());
+ }
+
+ public StreamObserver<UpdateResponse> getResponseObserver() {
+ return responseObserver;
+ }
+
+ public void setResponseObserver(
+ StreamObserver<UpdateResponse> responseObserver) {
+ this.responseObserver = responseObserver;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
new file mode 100644
index 0000000..c3b2fb8
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL server package.
+ */
+package org.apache.hadoop.hdds.scm.update.server;
\ No newline at end of file
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 12ace82..396452f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -267,7 +267,7 @@ public interface CertificateClient {
List<String> updateCAList() throws IOException;
/**
- * Get the CRLInfo based on the CRL Ids.
+ * Get the CRLInfo based on the CRL Ids from SCM.
* @param crlIds - list of crl ids
* @return list of CRLInfo
* @throws IOException
@@ -275,7 +275,7 @@ public interface CertificateClient {
List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
/**
- * Get the latest CRL id.
+ * Get the latest CRL id from SCM.
* @return latest CRL id.
* @throws IOException
*/
@@ -291,4 +291,26 @@ public interface CertificateClient {
OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
}
}
+
+ /**
+ * Get Local CRL id received.
+ * @return
+ */
+ long getLocalCrlId();
+
+ /**
+ * Set Local CRL id.
+ * @param crlId
+ */
+ void setLocalCrlId(long crlId);
+
+ /**
+ * Process crl and remove the certificates in the revoked cert list from
+ * client.
+ * @param crl
+ * @return true if the client's own cert needs to be reinit
+ * false otherwise;
+ */
+ boolean processCrl(CRLInfo crl);
+
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 5aa6338..55b984a 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -37,6 +37,7 @@ import java.security.SignatureException;
import java.security.cert.CertStore;
import java.security.cert.X509Certificate;
import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -71,6 +72,7 @@ import static
org.apache.hadoop.hdds.security.x509.exceptions.CertificateExcepti
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
import static
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
+import org.apache.ratis.util.FileUtils;
import org.bouncycastle.cert.X509CertificateHolder;
import org.slf4j.Logger;
@@ -96,6 +98,7 @@ public abstract class DefaultCertificateClient implements
CertificateClient {
private String certSerialId;
private String caCertId;
private String rootCaCertId;
+ private long localCrlId;
private String component;
private List<String> pemEncodedCACerts = null;
private final Lock lock;
@@ -942,4 +945,66 @@ public abstract class DefaultCertificateClient implements
CertificateClient {
lock.unlock();
}
}
+
+ @Override
+ public boolean processCrl(CRLInfo crl){
+ List<String> certIds2Remove = new ArrayList();
+ crl.getX509CRL().getRevokedCertificates().forEach(
+ cert -> certIds2Remove.add(cert.getSerialNumber().toString()));
+ boolean reinitCert = removeCertificates(certIds2Remove);
+ setLocalCrlId(crl.getCrlSequenceID());
+ return reinitCert;
+ }
+
+
+ private boolean removeCertificates(List<String> certIds){
+ lock.lock();
+ boolean reInitCert = false;
+ try {
+ // For now, remove self cert and ca cert is not implemented
+ // both requires a restart of the service.
+ if ((certSerialId!=null && certIds.contains(certSerialId)) ||
+ (caCertId!=null && certIds.contains(caCertId)) ||
+ (rootCaCertId!=null && certIds.contains(rootCaCertId))) {
+ reInitCert = true;
+ }
+
+ Path basePath = securityConfig.getCertificateLocation(component);
+ for (String certId : certIds) {
+ if (certificateMap.containsKey(certId)) {
+ // remove on disk
+ String certName = String.format(CERT_FILE_NAME_FORMAT, certId);
+
+ if (certId.equals(caCertId)) {
+ certName = CA_CERT_PREFIX + certName;
+ }
+
+ if (certId.equals(rootCaCertId)) {
+ certName = ROOT_CA_CERT_PREFIX + certName;
+ }
+
+ FileUtils.deleteFileQuietly(basePath.resolve(certName).toFile());
+ // remove in memory
+ certificateMap.remove(certId);
+
+ // TODO: reset certSerialId, caCertId or rootCaCertId
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return reInitCert;
+ }
+
+ public long getLocalCrlId() {
+ return this.localCrlId;
+ }
+
+ /**
+ * Set Local CRL id.
+ * @param crlId
+ */
+ public void setLocalCrlId(long crlId){
+ this.localCrlId = crlId;
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
index 0c7054a..d26eced 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
@@ -42,12 +42,19 @@ public class OMCertificateClient extends
DefaultCertificateClient {
public static final String COMPONENT_NAME = "om";
public OMCertificateClient(SecurityConfig securityConfig,
- String certSerialId) {
+ String certSerialId, String localCrlId) {
super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
+ this.setLocalCrlId(localCrlId!=null ?
+ Long.parseLong(localCrlId): 0);
+ }
+
+ public OMCertificateClient(SecurityConfig securityConfig,
+ String certSerialId) {
+ this(securityConfig, certSerialId, null);
}
public OMCertificateClient(SecurityConfig securityConfig) {
- super(securityConfig, LOG, null, COMPONENT_NAME);
+ this(securityConfig, null, null);
}
@Override
diff --git a/hadoop-hdds/interface-client/pom.xml
b/hadoop-hdds/interface-client/pom.xml
index b3083ba..4794eef 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -72,6 +72,8 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<includes>
<include>DatanodeClientProtocol.proto</include>
<include>InterSCMProtocol.proto</include>
+ <include>SCMClientProtocol.proto</include>
+ <include>SCMUpdateProtocol.proto</include>
</includes>
<outputDirectory>target/generated-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
diff --git
a/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto
new file mode 100644
index 0000000..c3e7cc0
--- /dev/null
+++ b/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+option java_package = "org.apache.hadoop.hdds.protocol.scm.proto";
+option java_outer_classname = "SCMUpdateServiceProtos";
+option java_generate_equals_and_hash = true;
+option java_generic_services = true;
+package hadoop.hdds.scm;
+
+/**
+ * Information for Certificate Revocation List.
+ */
+message CRLInfoProto {
+ required string x509CRL = 1;
+ required uint64 creationTimestamp = 2;
+ required int64 crlSequenceID = 3;
+}
+message ClientId {
+ required int64 msb = 1;
+ required int64 lsb = 2;
+}
+
+message SubscribeRequest {
+ optional string hostname = 1;
+}
+
+message SubscribeResponse {
+ required ClientId clientId = 1;
+}
+
+enum Type {
+ CRLUpdate = 1;
+ PipelineUpdate = 2;
+ }
+
+message UpdateRequest {
+ required Type updateType = 1; // Type of the update
+ optional string traceID = 2;
+ required ClientId clientId = 3;
+ optional CRLUpdateRequest crlUpdateRequest= 4;
+}
+
+message UpdateResponse {
+ required Type updateType = 1; // Type of the update
+ optional string traceID = 2;
+ optional CRLUpdateResponse crlUpdateResponse = 3;
+}
+
+message CRLUpdateRequest {
+ required int64 receivedCrlId = 1;
+ repeated int64 pendingCrlIds = 2;
+}
+
+message CRLUpdateResponse {
+ optional CRLInfoProto crlInfo = 1;
+}
+
+message UnsubscribeRequest {
+ required ClientId clientId = 1;
+}
+
+message UnsubscribeResponse {
+}
+
+// SCM Update service streams updates to all subscribers
+service SCMUpdateService {
+
+ // Client subscribe to future SCM CRL update
+ rpc subscribe (SubscribeRequest) returns (SubscribeResponse) {};
+
+ // Client update SCM about its CRL processing status
+ rpc updateStatus (stream UpdateRequest) returns (stream UpdateResponse) {};
+
+ // Client unsubscribe to future SCM CRL update
+ rpc unsubscribe (UnsubscribeRequest) returns (UnsubscribeResponse) {};
+
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 7f47b5f..6d4759c 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -128,6 +128,8 @@ enum Status {
UNKNOWN_PIPELINE_STATE = 35;
CONTAINER_NOT_FOUND = 36;
CONTAINER_REPLICA_NOT_FOUND = 37;
+ FAILED_TO_CONNECT_TO_CRL_SERVICE = 38;
+ FAILED_TO_ADD_CRL_CLIENT = 39;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 77bf5a7..3392454 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -42,6 +42,9 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.hadoop.hdds.scm.update.server.SCMCRLStore;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import
org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
@@ -83,7 +86,8 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
private final CertificateServer rootCertificateServer;
private final CertificateServer scmCertificateServer;
private final X509Certificate rootCACertificate;
- private final RPC.Server rpcServer;
+ private final RPC.Server rpcServer; // HADOOP RPC SERVER
+ private final SCMUpdateServiceGrpcServer grpcUpdateServer; // gRPC SERVER
private final InetSocketAddress rpcAddress;
private final ProtocolMessageMetrics metrics;
private final StorageContainerManager storageContainerManager;
@@ -124,6 +128,10 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
}
+
+ this.grpcUpdateServer = new SCMUpdateServiceGrpcServer(
+ conf.getObject(UpdateServiceConfig.class),
+ new SCMCRLStore(scmCertificateServer));
}
/**
@@ -352,13 +360,19 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
.collect(Collectors.toList()), CRLReason.lookup(reason),
new Date(revocationTime));
try {
- return revoked.get().get().longValue();
+ Long crlId = revoked.get().get();
+ getGrpcUpdateServer().notifyCrlUpdate();
+ return crlId;
} catch (InterruptedException | ExecutionException e) {
throw new SCMException("Fail to revoke certs",
SCMException.ResultCodes.FAILED_TO_REVOKE_CERTIFICATES);
}
}
+ public SCMUpdateServiceGrpcServer getGrpcUpdateServer() {
+ return grpcUpdateServer;
+ }
+
@VisibleForTesting
public UserGroupInformation getRpcRemoteUser() {
return Server.getRemoteUser();
@@ -372,12 +386,13 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
return rpcAddress;
}
- public void start() {
+ public void start() throws IOException {
String startupMsg = StorageContainerManager.buildRpcServerStartMessage(
"Starting RPC server for SCMSecurityProtocolServer.", getRpcAddress());
LOGGER.info(startupMsg);
metrics.register();
getRpcServer().start();
+ getGrpcUpdateServer().start();
}
public void stop() {
@@ -385,6 +400,7 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
LOGGER.info("Stopping the SCMSecurityProtocolServer.");
metrics.unregister();
getRpcServer().stop();
+ getGrpcUpdateServer().stop();
} catch (Exception ex) {
LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
}
@@ -393,6 +409,9 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
public void join() throws InterruptedException {
LOGGER.trace("Join RPC server for SCMSecurityProtocolServer.");
getRpcServer().join();
+ LOGGER.trace("Join gRPC server for SCMSecurityProtocolServer.");
+ getGrpcUpdateServer().join();
+
}
public CertificateServer getRootCertificateServer() {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
new file mode 100644
index 0000000..89775c4
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class for SCM CRL store.
+ */
+public class SCMCRLStore implements CRLStore {
+
+ private final CertificateServer certServer;
+
+ public SCMCRLStore(CertificateServer certServer) {
+ this.certServer = certServer;
+ }
+
+ @Override
+ public long getLatestCrlId() {
+ return certServer.getLatestCrlId();
+ }
+
+ @Override
+ public CRLInfo getCRL(long crlId) throws IOException {
+ List<Long> crlIdList = new ArrayList<>();
+ crlIdList.add(crlId);
+ return certServer.getCrls(crlIdList).get(0);
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
new file mode 100644
index 0000000..bc92f82
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.crl.CRLCodec;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class handle the CRL client update and response.
+ */
+public class SCMCRLUpdateHandler implements SCMUpdateHandler {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMCRLUpdateHandler.class);
+ private final CRLStore crlStore;
+
+ private static final SCMUpdateServiceProtos.Type TYPE =
+ SCMUpdateServiceProtos.Type.CRLUpdate;
+
+ private final Map<UUID, CRLClientInfo> clients;
+
+ SCMCRLUpdateHandler(CRLStore crlStore) {
+ this.crlStore = crlStore;
+ clients = new ConcurrentHashMap<>();
+ }
+
+ public SCMUpdateServiceProtos.Type getType() {
+ return TYPE;
+ }
+
+ @Override
+ public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request,
+ SCMUpdateClientInfo clientInfo) {
+ SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest =
+ request.getCrlUpdateRequest();
+ long clientCrlId = updateStatusRequest.getReceivedCrlId();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}",
+ clientInfo.getClientId(), clientCrlId,
+ updateStatusRequest.getPendingCrlIdsList().toString());
+ }
+
+ CRLClientInfo crlClientInfo;
+ if (!clients.containsKey(clientInfo.getClientId())) {
+ crlClientInfo = new CRLClientInfo(clientInfo);
+ clients.put(clientInfo.getClientId(), crlClientInfo);
+ } else {
+ crlClientInfo = clients.get(clientInfo.getClientId());
+ }
+
+ crlClientInfo.setPendingCrlIds(
+ request.getCrlUpdateRequest().getPendingCrlIdsList());
+ crlClientInfo.setReceivedCrlId(
+ request.getCrlUpdateRequest().getReceivedCrlId());
+
+ sendCrlUpdateToClient(crlClientInfo);
+ }
+
+ @Override
+ public void onUpdate() {
+ LOG.debug("Update due to certificate revocation");
+ // server crl id is usually > client crl id when this is invoked.
+ clients.values().forEach(client -> {
+ sendCrlUpdateToClient(client);
+ });
+ }
+
+ @Override
+ public void onRemoveClient(SCMUpdateClientInfo clientInfo) {
+ clients.remove(clientInfo.getClientId());
+ }
+
+ private void sendCrlUpdateToClient(CRLClientInfo client) {
+ long clientCrlId = client.getReceivedCrlId();
+ long serverCrlId = crlStore.getLatestCrlId();
+
+ if (clientCrlId >= serverCrlId) {
+ return;
+ }
+
+ LOG.debug("## Server: clientCrlId {} serverCrlId {}",
+ clientCrlId, serverCrlId);
+
+ long nextCrlId = clientCrlId + 1;
+ try {
+ CRLInfo crlInfo = null;
+ while (crlInfo == null && nextCrlId <= serverCrlId) {
+ crlInfo = crlStore.getCRL(nextCrlId);
+ nextCrlId++;
+ }
+ if (crlInfo == null) {
+ LOG.debug("Nothing to send to client");
+ return;
+ }
+ sendCrlToClient(crlInfo, client.getUpdateClientInfo());
+ } catch (Exception e) {
+ LOG.error("Failed to handle client update.", e);
+
client.getUpdateClientInfo().getResponseObserver().onError(Status.INTERNAL
+ .withDescription("Failed to send crl" + nextCrlId +
+ " to client " + client.getUpdateClientInfo().getClientId())
+ .asException());
+ }
+ }
+
+ private void sendCrlToClient(CRLInfo crl, SCMUpdateClientInfo clientInfo)
+ throws SCMSecurityException {
+ LOG.debug("Sending client# {} with crl: {} ",
+ clientInfo.getClientId(), crl.getCrlSequenceID());
+ StreamObserver<UpdateResponse> responseObserver =
+ clientInfo.getResponseObserver();
+ SCMUpdateServiceProtos.CRLInfoProto crlInfoProto =
+ SCMUpdateServiceProtos.CRLInfoProto.newBuilder()
+ .setCrlSequenceID(crl.getCrlSequenceID())
+ .setX509CRL(CRLCodec.getPEMEncodedString(crl.getX509CRL()))
+ .setCreationTimestamp(crl.getCreationTimestamp()).build();
+ responseObserver.onNext(
+ UpdateResponse.newBuilder()
+ .setUpdateType(SCMUpdateServiceProtos.Type.CRLUpdate)
+ .setCrlUpdateResponse(CRLUpdateResponse.newBuilder()
+ .setCrlInfo(crlInfoProto).build()).build());
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
new file mode 100644
index 0000000..710cd20
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class that manages SCM update clients.
+ */
+public class SCMUpdateClientManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMUpdateClientManager.class);
+ private Map<UUID, SCMUpdateClientInfo> clients;
+ private Map<Type, SCMUpdateHandler> handlers;
+
+ public SCMUpdateClientManager() {
+ clients = new ConcurrentHashMap<>();
+ handlers = new ConcurrentHashMap<>();
+ }
+
+ public void registerHandler(SCMUpdateHandler handler) {
+ handlers.put(handler.getType(), handler);
+ }
+
+ public void unRegisterHandler(Type type) {
+ handlers.remove(type);
+ }
+
+ public UUID addClient() throws SCMException {
+ UUID clientId = UUID.randomUUID();
+ int retryCount = 5;
+ while (clients.containsKey(clientId)) {
+ if (retryCount > 0) {
+ clientId = UUID.randomUUID();
+ retryCount--;
+ } else {
+ throw new SCMException("Failed to add CRL client with random clientId"
+
+ " collision", SCMException.ResultCodes.FAILED_TO_ADD_CRL_CLIENT);
+ }
+ }
+
+ SCMUpdateClientInfo clientInfo = new SCMUpdateClientInfo(clientId);
+ clients.put(clientId, clientInfo);
+ return clientId;
+ }
+
+ // this does not necessarily produce a server response via responseObserver.
+ public void handleClientUpdate(UpdateRequest request,
+ StreamObserver<UpdateResponse> responseObserver) {
+ UUID clientId = SCMUpdateClientInfo.fromClientIdProto(
+ request.getClientId());
+
+ // Unknown client update
+ if (!clients.containsKey(clientId)) {
+ responseObserver.onError(Status.INVALID_ARGUMENT
+ .withDescription("Client must subscribe before it can " +
+ "send/receive updates")
+ .asException());
+ }
+
+ // record the server to client channel
+ SCMUpdateClientInfo clientInfo = clients.get(clientId);
+ if (clientInfo.getResponseObserver() == null) {
+ clientInfo.setResponseObserver(responseObserver);
+ }
+
+ if (handlers.containsKey(request.getUpdateType())) {
+ handlers.get(request.getUpdateType())
+ .handleClientRequest(request, clientInfo);
+ } else {
+ responseObserver.onError(Status.INVALID_ARGUMENT
+ .withDescription("Unknown client update type.")
+ .asException());
+ }
+ }
+
+ /**
+ * Remove client by client Id.
+ * @param clientId - client Id
+ * @return true if client is removed, false otherwise.
+ */
+ public boolean removeClient(UUID clientId) {
+ if (clients.containsKey(clientId)) {
+ SCMUpdateClientInfo clientInfo = clients.remove(clientId);
+ handlers.values().forEach(handler -> handler.onRemoveClient(clientInfo));
+ LOG.info("Client {} removed.", clientId);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Remove client by its responseObserver obj.
+ * @param responseObserver - response observer of the client
+ * @return true if client is removed, false otherwise.
+ */
+ public boolean removeClient(StreamObserver<UpdateResponse> responseObserver)
{
+ UUID clientId = null;
+ for (SCMUpdateClientInfo client : clients.values()) {
+ if (client.getResponseObserver() == responseObserver) {
+ clientId = client.getClientId();
+ break;
+ }
+ }
+ if (clientId != null) {
+ LOG.debug("Remove client {} by responseObserver", clientId);
+ removeClient(clientId);
+ return true;
+ }
+ LOG.debug("Remove client {} by responseObserver not found!");
+ return false;
+ }
+
+ public void onUpdate(Type type) {
+ if (handlers.containsKey(type)) {
+ handlers.get(type).onUpdate();
+ } else {
+ LOG.warn("Unknown update type to broadcast!");
+ }
+ }
+}
+
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
new file mode 100644
index 0000000..026bd4a
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+
+/**
+ * Interface used by server side to handle client update and server publish.
+ */
+public interface SCMUpdateHandler {
+
+ /**
+ * handle client update request.
+ * @param request
+ * @param clientInfo
+ */
+ void handleClientRequest(UpdateRequest request,
+ SCMUpdateClientInfo clientInfo);
+
+ /**
+ * Handle server broadcast to all clients as needed.
+ */
+ void onUpdate();
+
+ /**
+ * Handle server remove client due to error streaming to the client.
+ * @param clientInfo
+ */
+ void onRemoveClient(SCMUpdateClientInfo clientInfo);
+
+ /**
+ * Return the type of Update the handler can handle.
+ * @return the type of Update the handler can handle.
+ */
+ Type getType();
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
new file mode 100644
index 0000000..fa1b256
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * gRPC server for SCM update services.
+ */
+public class SCMUpdateServiceGrpcServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMUpdateServiceGrpcServer.class);
+
+ private static final String SERVICE_NAME = "SCMUpdateService";
+ private CRLStore crlStore;
+ private int port;
+ private Server server;
+ private SCMUpdateServiceImpl scmUpdateService;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public SCMUpdateServiceGrpcServer(final UpdateServiceConfig updateConf,
+ final CRLStore crlStore) {
+ this.crlStore = crlStore;
+ this.port = updateConf.getPort();
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public void start() throws IOException {
+ LOG.info("{} starting", SERVICE_NAME);
+ scmUpdateService = new SCMUpdateServiceImpl(crlStore);
+ server = ServerBuilder.forPort(port).
+ addService(scmUpdateService)
+ .build();
+
+ if (!isStarted.compareAndSet(false, true)) {
+ LOG.info("Ignoring start() since {} has already started.", SERVICE_NAME);
+ return;
+ } else {
+ server.start();
+ }
+ }
+
+ public void stop() {
+ LOG.info("{} stopping", SERVICE_NAME);
+ if (isStarted.get()) {
+ scmUpdateService = null;
+ server.shutdown();
+ try {
+ server.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("failed to shutdown SCMClientGrpcServer", e);
+ } finally {
+ server.shutdownNow();
+ }
+ LOG.info("{} stopped!", SERVICE_NAME);
+ isStarted.set(false);
+ }
+ }
+
+ public void join() throws InterruptedException {
+ while (isStarted.get()) {
+ wait();
+ }
+ }
+
+ public void notifyCrlUpdate() {
+ scmUpdateService.notifyUpdate(SCMUpdateServiceProtos.Type.CRLUpdate);
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
new file mode 100644
index 0000000..f339c9c
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeResponse;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeRequest;
+import
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeResponse;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Impl class for SCM update service.
+ * Allows client to subscribe and bi-directional streaming update with server.
+ * Currently used for CRL udpate between SCM and OM.
+ * Can be extended to update SCM pipeline/container change in future.
+ */
+public class SCMUpdateServiceImpl extends
+ SCMUpdateServiceGrpc.SCMUpdateServiceImplBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMUpdateServiceImpl.class);
+
+ private SCMUpdateClientManager clientManager;
+
+ public SCMUpdateServiceImpl(CRLStore crlStore) {
+ clientManager = new SCMUpdateClientManager();
+ clientManager.registerHandler(new SCMCRLUpdateHandler(crlStore));
+ }
+
+ @Override
+ public void subscribe(SubscribeRequest request,
+ StreamObserver<SubscribeResponse> responseObserver) {
+ UUID clientId;
+ try {
+ clientId = clientManager.addClient();
+ } catch (SCMException ex) {
+ LOG.error("Fail to subscribe for Client.", ex);
+ responseObserver.onError(ex);
+ return;
+ }
+ responseObserver.onNext(SubscribeResponse.newBuilder()
+ .setClientId(SCMUpdateClientInfo.toClientIdProto(clientId))
+ .build());
+ responseObserver.onCompleted();
+ LOG.info("Client {} subscribed.", clientId);
+ }
+
+ @Override
+ public void unsubscribe(UnsubscribeRequest request,
+ StreamObserver<UnsubscribeResponse> responseObserver) {
+ UUID clientId = SCMUpdateClientInfo.fromClientIdProto(
+ request.getClientId());
+ boolean removed = clientManager.removeClient(clientId);
+ if (removed) {
+ LOG.info("Client {} unsubscribed.", clientId);
+ } else {
+ LOG.info("Client {} does not exist, no-op for unsubscribe", clientId);
+ }
+ responseObserver.onNext(UnsubscribeResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public StreamObserver<UpdateRequest> updateStatus(
+ StreamObserver<UpdateResponse> responseObserver) {
+ return new StreamObserver<UpdateRequest>() {
+ @Override
+ public void onNext(UpdateRequest updateRequest) {
+ LOG.debug("UpdateStatus onNext");
+ clientManager.handleClientUpdate(updateRequest, responseObserver);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.debug("UpdateStatus onError", throwable);
+ clientManager.removeClient(responseObserver);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.debug("UpdateStatus(Client) onComplete");
+ responseObserver.onCompleted();
+ clientManager.removeClient(responseObserver);
+ }
+ };
+ }
+
+ // service prepare a update response and broadcast to all clients subscribed.
+ public void notifyUpdate(Type type) {
+ clientManager.onUpdate(type);
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
new file mode 100644
index 0000000..c3b2fb8
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL server package.
+ */
+package org.apache.hadoop.hdds.scm.update.server;
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
index 86a8c3e..e1a4e62 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
@@ -26,6 +26,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.io.IOException;
+
/**
* Test class for {@link SCMSecurityProtocolServer}.
* */
@@ -55,7 +57,7 @@ public class TestSCMSecurityProtocolServer {
}
@Test
- public void testStart() {
+ public void testStart() throws IOException {
securityProtocolServer.start();
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
new file mode 100644
index 0000000..6152b39
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
+import org.apache.hadoop.hdds.scm.server.SCMCertStore;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CRLApprover;
+import
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import
org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCRLApprover;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.bouncycastle.asn1.x509.CRLReason;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Mock CRL Store impl for test.
+ */
+public class MockCRLStore implements CRLStore {
+
+ private static final String COMPONENT_NAME = "scm";
+ private static final Long INITIAL_SEQUENCE_ID = 0L;
+
+ private OzoneConfiguration config;
+ private SCMMetadataStore scmMetadataStore;
+ private CertificateStore scmCertStore;
+ private SecurityConfig securityConfig;
+ private KeyPair keyPair;
+ private CRLApprover crlApprover;
+ private final X509CertificateHolder caCertificateHolder;
+ private final Logger log;
+
+ public MockCRLStore(TemporaryFolder tempDir, Logger log) throws Exception {
+
+ this.log = log;
+ config = new OzoneConfiguration();
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ tempDir.newFolder().getAbsolutePath());
+
+ securityConfig = new SecurityConfig(config);
+ keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+
+ scmMetadataStore = new SCMMetadataStoreImpl(config);
+ scmCertStore = new SCMCertStore.Builder().setRatisServer(null)
+ .setCRLSequenceId(INITIAL_SEQUENCE_ID)
+ .setMetadaStore(scmMetadataStore)
+ .build();
+ crlApprover = new DefaultCRLApprover(securityConfig,
+ keyPair.getPrivate());
+
+ Files.createDirectories(securityConfig.getKeyLocation(COMPONENT_NAME));
+ caCertificateHolder = new X509CertificateHolder(generateX509Cert()
+ .getEncoded());
+ }
+
+ public BigInteger issueCert() throws Exception {
+ X509Certificate cert = generateX509Cert();
+ scmCertStore.storeValidCertificate(cert.getSerialNumber(), cert,
+ HddsProtos.NodeType.SCM);
+ return cert.getSerialNumber();
+ }
+
+ public Optional<Long> revokeCert(List<BigInteger> certs,
+ Instant revokeTime) throws IOException {
+ log.debug("Revoke certs: ", certs.toString());
+ Optional<Long> crlId = scmCertStore.revokeCertificates(certs,
+ caCertificateHolder,
+ CRLReason.lookup(CRLReason.keyCompromise),
+ Date.from(revokeTime), crlApprover);
+ List<CRLInfo> crlInfos =
+ scmCertStore.getCrls(ImmutableList.of(crlId.get()));
+
+ if (crlInfos.isEmpty()) {
+ log.debug("CRL[0]: {}", crlInfos.get(0).toString());
+ }
+ return crlId;
+ }
+
+
+ private X509Certificate generateX509Cert() throws Exception {
+ return CertificateCodec.getX509Certificate(
+ CertificateCodec.getPEMEncodedString(
+ KeyStoreTestUtil.generateCertificate("CN=Test", keyPair, 30,
+ "SHA256withRSA")));
+ }
+
+ @Override
+ public long getLatestCrlId() {
+ return scmCertStore.getLatestCrlId();
+ }
+
+ @Override
+ public CRLInfo getCRL(long crlId) throws IOException {
+ return scmCertStore.getCrls(Arrays.asList(crlId)).get(0);
+ }
+
+ public void close() throws Exception {
+ if (scmMetadataStore.getStore() != null) {
+ scmMetadataStore.getStore().close();
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
new file mode 100644
index 0000000..941f629
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.update.client.CRLClientUpdateHandler;
+import org.apache.hadoop.hdds.scm.update.client.ClientCRLStore;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateClientConfiguration;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Tests for SCM update Service.
+ */
+public class TestSCMUpdateServiceGrpcServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSCMUpdateServiceGrpcServer.class);
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+
+ @Rule
+ public ExpectedException thrown= ExpectedException.none();
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ private MockCRLStore mockCRLStore;
+
+ @Before
+ public void setUp() throws Exception {
+ mockCRLStore = new MockCRLStore(tempDir, LOG);
+ GenericTestUtils.setLogLevel(CRLClientUpdateHandler.getLog(), Level.DEBUG);
+ }
+
+ @After
+ public void destroyDbStore() throws Exception {
+ if (mockCRLStore != null) {
+ mockCRLStore.close();
+ mockCRLStore = null;
+ }
+ }
+
+ private UpdateServiceConfig getUpdateServiceConfig(OzoneConfiguration conf) {
+ return conf.getObject(UpdateServiceConfig.class);
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+ getUpdateServiceConfig(conf), mockCRLStore);
+ ClientCRLStore clientCRLStore = new ClientCRLStore();
+ SCMUpdateServiceGrpcClient client =
+ new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+
+ try {
+ server.start();
+ client.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ // client need to handle the case when the server is stopped first.
+ client.stop(true);
+ server.stop();
+ }
+ }
+
+
+ @Test
+ public void testClientUpdateWithRevoke() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+ getUpdateServiceConfig(conf), mockCRLStore);
+ ClientCRLStore clientCRLStore = new ClientCRLStore();
+ SCMUpdateServiceGrpcClient client =
+ new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+ server.start();
+ client.start();
+
+ try {
+ // issue 10 certs
+ List<BigInteger> certIds = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ BigInteger certId = mockCRLStore.issueCert();
+ certIds.add(certId);
+ }
+
+ // revoke 4 certs and broadcast
+ for (int i = 0; i < 4; i++) {
+ revokeCertNow((certIds.get(i)));
+ }
+ server.notifyCrlUpdate();
+
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()==4, 100, 2000);
+ Assert.assertEquals(4, client.getUpdateCount());
+ Assert.assertEquals(0, client.getErrorCount());
+
+ revokeCertNow(certIds.get(5));
+ server.notifyCrlUpdate();
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()>4, 100, 2000);
+ Assert.assertEquals(5, client.getUpdateCount());
+ Assert.assertEquals(0, client.getErrorCount());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ client.stop(true);
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testClientUpdateWithDelayedRevoke() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+ getUpdateServiceConfig(conf), mockCRLStore);
+
+ ClientCRLStore clientCRLStore = new ClientCRLStore();
+
+ // check pending crl every 5 seconds
+ SCMUpdateClientConfiguration updateClientConfiguration =
+ conf.getObject(SCMUpdateClientConfiguration.class);
+ updateClientConfiguration.setClientCrlCheckInterval(Duration.ofSeconds(2));
+ conf.setFromObject(updateClientConfiguration);
+
+ SCMUpdateServiceGrpcClient client =
+ new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+ server.start();
+ client.start();
+
+ try {
+ // issue 10 certs
+ List<BigInteger> certIds = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ BigInteger certId = mockCRLStore.issueCert();
+ certIds.add(certId);
+ }
+
+ // revoke cert 0
+ revokeCertNow((certIds.get(0)));
+ server.notifyCrlUpdate();
+
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()==1,
+ 100, 2000);
+ Assert.assertEquals(1, client.getUpdateCount());
+ Assert.assertEquals(0, client.getErrorCount());
+
+ // revoke cert 5 with 10 seconds delay
+ revokeCert(certIds.get(5), Instant.now().plus(Duration.ofSeconds(5)));
+ server.notifyCrlUpdate();
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()>1,
+ 100, 2000);
+ Assert.assertEquals(2, client.getUpdateCount());
+ Assert.assertEquals(0, client.getErrorCount());
+ Assert.assertEquals(1, client.getClientCRLStore()
+ .getPendingCrlIds().size());
+
+ GenericTestUtils.waitFor(() -> client.getPendingCrlRemoveCount()==1,
+ 100, 20_000);
+ Assert.assertTrue(client.getClientCRLStore()
+ .getPendingCrlIds().isEmpty());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ client.stop(true);
+ server.stop();
+ }
+ }
+
+ private Long revokeCert(BigInteger certId, Instant revokeTime)
+ throws IOException {
+ Optional<Long> crlId =
+ mockCRLStore.revokeCert(Arrays.asList(certId), revokeTime);
+ return crlId.get();
+ }
+
+ private Long revokeCertNow(BigInteger certId) throws IOException {
+ Optional<Long> crlId =
+ mockCRLStore.revokeCert(Arrays.asList(certId), Instant.now());
+ return crlId.get();
+ }
+
+ @Test
+ public void testClientUpdateWithRestart() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+ getUpdateServiceConfig(conf), mockCRLStore);
+ ClientCRLStore clientCRLStore = new ClientCRLStore();
+ SCMUpdateServiceGrpcClient client =
+ new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+ server.start();
+ client.start();
+
+ try {
+ // issue 10 certs
+ List<BigInteger> certIds = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ BigInteger certId = mockCRLStore.issueCert();
+ certIds.add(certId);
+ }
+
+ // revoke 4 certs and broadcast
+ for (int i = 0; i < 4; i++) {
+ revokeCertNow((certIds.get(i)));
+ }
+ server.notifyCrlUpdate();
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()==4,
+ 100, 2000);
+ Assert.assertEquals(4, client.getUpdateCount());
+
+
+ // server restart
+ // client onError->
+ // 1. reconnect
+ // 2. new subscribe resumes from previous state
+ LOG.info("Test server restart begin.");
+ // server shutdown can lead to duplicate message received on client when
+ // client retry connect to the server. The client will handle that.
+ server.stop();
+ server.start();
+ GenericTestUtils.waitFor(() -> client.getErrorCount()==1,
+ 100, 2000);
+ Assert.assertEquals(4, client.getUpdateCount());
+ Assert.assertEquals(1, client.getErrorCount());
+ Assert.assertEquals(4, clientCRLStore.getLatestCrlId());
+ LOG.info("Test server restart end.");
+
+ revokeCertNow(certIds.get(5));
+ server.notifyCrlUpdate();
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()>4,
+ 100, 5000);
+ Assert.assertEquals(5, client.getUpdateCount());
+ Assert.assertEquals(1, client.getErrorCount());
+ Assert.assertEquals(5, clientCRLStore.getLatestCrlId());
+
+ // client restart
+ // server onError->
+ // 1. remove stale client
+ // 2. new subscribe resumes from previous state.
+ LOG.info("Test client restart begin.");
+ // a full client channel shutdown and create
+ client.stop(true);
+ client.createChannel();
+ client.start();
+ Assert.assertEquals(5, clientCRLStore.getLatestCrlId());
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()>4,
+ 100, 2000);
+ revokeCertNow(certIds.get(6));
+ // mostly noop
+ server.notifyCrlUpdate();
+ LOG.info("Test client restart end.");
+
+ GenericTestUtils.waitFor(() -> client.getUpdateCount()>5,
+ 100, 2000);
+ Assert.assertTrue(client.getUpdateCount()>=6);
+ Assert.assertEquals(2, client.getErrorCount());
+ Assert.assertEquals(6, clientCRLStore.getLatestCrlId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ client.stop(true);
+ server.stop();
+ }
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 6bac924..b37f785 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -225,4 +225,18 @@ public class CertificateClientTestImpl implements
CertificateClient {
return 0;
}
+ @Override
+ public long getLocalCrlId() {
+ return 0;
+ }
+
+ @Override
+ public void setLocalCrlId(long crlId) {
+ }
+
+ @Override
+ public boolean processCrl(CRLInfo crl) {
+ return false;
+ }
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index cce9d04..a3c2fd7 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
/**
* Wrapper class for Scm protocol clients.
@@ -27,6 +28,7 @@ public class ScmClient {
private final ScmBlockLocationProtocol blockClient;
private final StorageContainerLocationProtocol containerClient;
+ private SCMUpdateServiceGrpcClient updateServiceGrpcClient;
ScmClient(ScmBlockLocationProtocol blockClient,
StorageContainerLocationProtocol containerClient) {
@@ -41,4 +43,13 @@ public class ScmClient {
public StorageContainerLocationProtocol getContainerClient() {
return this.containerClient;
}
+
+ public void setUpdateServiceGrpcClient(
+ SCMUpdateServiceGrpcClient updateClient) {
+ this.updateServiceGrpcClient = updateClient;
+ }
+
+ public SCMUpdateServiceGrpcClient getUpdateServiceGrpcClient() {
+ return updateServiceGrpcClient;
+ }
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
index 4c68ed6..086d715 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import
org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
-import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.ozone.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]