This is an automated email from the ASF dual-hosted git repository.

vivekratnavel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 25d66d7  HDDS-5154. Add SCM GRPC server to publish CRL update. (#2216)
25d66d7 is described below

commit 25d66d72761820fc63698493326edc993213853c
Author: Xiaoyu Yao <[email protected]>
AuthorDate: Wed Jun 2 16:44:55 2021 -0700

    HDDS-5154. Add SCM GRPC server to publish CRL update. (#2216)
---
 .github/workflows/post-commit.yml                  |   2 +-
 .../hadoop/hdds/scm/exceptions/SCMException.java   |   4 +-
 .../hadoop/hdds/security/x509/crl/CRLInfo.java     |  39 +++
 .../scm/update/client/CRLClientUpdateHandler.java  | 201 ++++++++++++++
 .../hadoop/hdds/scm/update/client/CRLStore.java    |  34 +++
 .../hdds/scm/update/client/ClientCRLStore.java     |  98 +++++++
 .../scm/update/client/ClientUpdateHandler.java     |  29 ++
 .../client/SCMUpdateClientConfiguration.java       |  53 ++++
 .../update/client/SCMUpdateServiceGrpcClient.java  | 218 +++++++++++++++
 .../scm/update/client/UpdateServiceConfig.java     |  34 +--
 .../hdds/scm/update/client/package-info.java       |  22 ++
 .../hdds/scm/update/server/CRLClientInfo.java      |  56 ++++
 .../scm/update/server/SCMUpdateClientInfo.java     |  67 +++++
 .../hdds/scm/update/server/package-info.java       |  22 ++
 .../x509/certificate/client/CertificateClient.java |  26 +-
 .../client/DefaultCertificateClient.java           |  65 +++++
 .../certificate/client/OMCertificateClient.java    |  11 +-
 hadoop-hdds/interface-client/pom.xml               |   2 +
 .../src/main/proto/SCMUpdateProtocol.proto         |  98 +++++++
 .../src/main/proto/ScmServerProtocol.proto         |   2 +
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  25 +-
 .../hadoop/hdds/scm/update/server/SCMCRLStore.java |  51 ++++
 .../scm/update/server/SCMCRLUpdateHandler.java     | 151 +++++++++++
 .../scm/update/server/SCMUpdateClientManager.java  | 149 ++++++++++
 .../hdds/scm/update/server/SCMUpdateHandler.java   |  53 ++++
 .../update/server/SCMUpdateServiceGrpcServer.java  |  99 +++++++
 .../scm/update/server/SCMUpdateServiceImpl.java    | 119 ++++++++
 .../hdds/scm/update/server/package-info.java       |  22 ++
 .../scm/server/TestSCMSecurityProtocolServer.java  |   4 +-
 .../hdds/scm/update/server/MockCRLStore.java       | 138 ++++++++++
 .../server/TestSCMUpdateServiceGrpcServer.java     | 301 +++++++++++++++++++++
 .../ozone/client/CertificateClientTestImpl.java    |  14 +
 .../java/org/apache/hadoop/ozone/om/ScmClient.java |  11 +
 .../ozone/recon/scm/TestReconNodeManager.java      |   2 +-
 34 files changed, 2194 insertions(+), 28 deletions(-)

diff --git a/.github/workflows/post-commit.yml 
b/.github/workflows/post-commit.yml
index 65d4a76..6dae941 100644
--- a/.github/workflows/post-commit.yml
+++ b/.github/workflows/post-commit.yml
@@ -227,7 +227,7 @@ jobs:
         continue-on-error: true
   integration:
     runs-on: ubuntu-18.04
-    timeout-minutes: 120
+    timeout-minutes: 150
     if: github.event_name != 'pull_request' || github.event.pull_request.draft 
== false
     strategy:
       matrix:
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 20185cd..cd1ea08 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -130,6 +130,8 @@ public class SCMException extends IOException {
     PIPELINE_NOT_FOUND,
     UNKNOWN_PIPELINE_STATE,
     CONTAINER_NOT_FOUND,
-    CONTAINER_REPLICA_NOT_FOUND
+    CONTAINER_REPLICA_NOT_FOUND,
+    FAILED_TO_CONNECT_TO_CRL_SERVICE,
+    FAILED_TO_ADD_CRL_CLIENT,
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
index 633cbcb..5a9fba6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/crl/CRLInfo.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.security.x509.crl;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.jetbrains.annotations.NotNull;
 
@@ -26,6 +27,8 @@ import java.io.IOException;
 import java.security.cert.CRLException;
 import java.security.cert.CertificateException;
 import java.security.cert.X509CRL;
+import java.security.cert.X509CRLEntry;
+import java.time.Instant;
 import java.util.Comparator;
 import java.util.Objects;
 
@@ -38,11 +41,17 @@ public class CRLInfo implements Comparator<CRLInfo>,
   private X509CRL x509CRL;
   private long creationTimestamp;
   private long crlSequenceID;
+  private Instant revocationTime;
 
   private CRLInfo(X509CRL x509CRL, long creationTimestamp, long crlSequenceID) 
{
+    assert((x509CRL != null) &&
+        !x509CRL.getRevokedCertificates().isEmpty());
     this.x509CRL = x509CRL;
     this.creationTimestamp = creationTimestamp;
     this.crlSequenceID = crlSequenceID;
+    X509CRLEntry entry = x509CRL.getRevokedCertificates().iterator().next();
+    this.revocationTime = Instant.ofEpochMilli(
+        entry.getRevocationDate().getTime());
   }
 
   /**
@@ -71,6 +80,28 @@ public class CRLInfo implements Comparator<CRLInfo>,
         .build();
   }
 
+  public static CRLInfo fromCRLProto3(
+      SCMUpdateServiceProtos.CRLInfoProto info)
+      throws IOException, CRLException, CertificateException {
+    CRLInfo.Builder builder = new CRLInfo.Builder();
+    return builder
+        .setX509CRL(CRLCodec.getX509CRL(info.getX509CRL()))
+        .setCreationTimestamp(info.getCreationTimestamp())
+        .setCrlSequenceID(info.getCrlSequenceID())
+        .build();
+  }
+
+  public SCMUpdateServiceProtos.CRLInfoProto getCRLProto3()
+      throws SCMSecurityException {
+    SCMUpdateServiceProtos.CRLInfoProto.Builder builder =
+        SCMUpdateServiceProtos.CRLInfoProto.newBuilder();
+
+    return builder.setX509CRL(CRLCodec.getPEMEncodedString(getX509CRL()))
+        .setCreationTimestamp(getCreationTimestamp())
+        .setCrlSequenceID(getCrlSequenceID())
+        .build();
+  }
+
   public X509CRL getX509CRL() {
     return x509CRL;
   }
@@ -83,6 +114,14 @@ public class CRLInfo implements Comparator<CRLInfo>,
     return crlSequenceID;
   }
 
+  public boolean shouldRevokeNow() {
+    return revocationTime.isBefore(Instant.now());
+  }
+
+  public Instant getRevocationTime() {
+    return revocationTime;
+  }
+
   /**
    * Compares this object with the specified object for order.  Returns a
    * negative integer, zero, or a positive integer as this object is less
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
new file mode 100644
index 0000000..df219e0
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CRL client update handler that handles local CRL update and pending CRLs.
+ */
+public class CRLClientUpdateHandler implements ClientUpdateHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CRLClientUpdateHandler.class);
+  private static final String NAME = "CRLClientUpdateHandler";
+
+  private final SCMUpdateServiceStub updateStub;
+  private final ClientCRLStore clientStore;
+
+  // Used to update server about local pending crl id list
+  private StreamObserver<UpdateRequest> requestObserver;
+  private UUID clientUuid;
+  private SCMUpdateServiceProtos.ClientId clientIdProto;
+
+  // periodically process pending crls
+  private ScheduledExecutorService executorService;
+  private final SCMUpdateServiceGrpcClient serviceGrpcClient;
+  private long crlCheckInterval;
+
+  CRLClientUpdateHandler(UUID clientId,
+      SCMUpdateServiceStub updateStub,
+      SCMUpdateServiceGrpcClient serviceGrpcClient,
+      long crlCheckInterval) {
+    this.clientUuid = clientId;
+    this.updateStub = updateStub;
+    this.serviceGrpcClient = serviceGrpcClient;
+
+    this.clientStore = serviceGrpcClient.getClientCRLStore();
+    this.crlCheckInterval = crlCheckInterval;
+    LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000);
+    this.executorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("CRLUpdateHandler Thread - %d").build());
+  }
+
+  public static Logger getLog() {
+    return LOG;
+  }
+
+  @Override
+  public void handleServerUpdate(UpdateResponse updateResponse) {
+    SCMUpdateServiceProtos.CRLInfoProto crlInfo =
+        updateResponse.getCrlUpdateResponse().getCrlInfo();
+
+    long receivedCrlId = crlInfo.getCrlSequenceID();
+    long localCrlId = clientStore.getLatestCrlId();
+
+    LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}",
+        clientUuid, localCrlId, receivedCrlId);
+    if (localCrlId == receivedCrlId) {
+      return;
+    }
+    // send a client update to refresh stale server
+    if (localCrlId > receivedCrlId) {
+      LOG.warn("Received stale crlId {} lower than client crlId {}",
+          receivedCrlId, localCrlId);
+      sendClientUpdate();
+      return;
+    }
+
+    CRLInfo crl;
+    try {
+      crl = CRLInfo.fromCRLProto3(crlInfo);
+    } catch (Exception e) {
+      LOG.error("Can't parse server CRL update, skip...", e);
+      return;
+    }
+    clientStore.onRevokeCerts(crl);
+    // send client update.
+    sendClientUpdate();
+  }
+
+  public void start() {
+    // send initial update request to get a request observer handle
+    UpdateRequest updateReq = getUpdateRequest();
+    requestObserver = updateStub.withWaitForReady()
+        .updateStatus(new StreamObserver<UpdateResponse>() {
+          @Override
+          public void onNext(UpdateResponse updateResponse) {
+            LOG.debug("Receive server response: {}", 
updateResponse.toString());
+            serviceGrpcClient.incrUpdateCount();
+            handleServerUpdate(updateResponse);
+          }
+
+          @Override
+          public void onError(Throwable throwable) {
+            LOG.debug("Receive server error ", throwable);
+            serviceGrpcClient.incrErrorCount();
+            if (serviceGrpcClient.getIsRunning().get()) {
+              // TODO: not all server error needs client restart.
+              LOG.warn("Restart client on server error: ", throwable);
+              serviceGrpcClient.restart();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            LOG.debug("Receive server completed");
+          }
+        });
+    requestObserver.onNext(updateReq);
+    startPendingCrlChecker();
+  }
+
+  public void stop() {
+    stopPendingCrlCheck();
+  }
+
+  private void stopPendingCrlCheck() {
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while waiting for executor service" +
+          " to shutdown", e);
+    }
+  }
+
+  private void startPendingCrlChecker() {
+    executorService.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        // background thread handle pending crl and update server
+
+        CRLInfo crl = null;
+        while ((crl = clientStore.getNextPendingCrl()) != null) {
+          if (crl.shouldRevokeNow()) {
+            serviceGrpcClient.incrPendingCrlRemoveCount();
+            LOG.info("Time to process crlId {}", crl.getCrlSequenceID());
+            clientStore.removePendingCrl(crl);
+            sendClientUpdate();
+          } else {
+            // we are done with this pending Crl, wait for next round
+            break;
+          }
+        }
+      }
+    }, 0, crlCheckInterval, TimeUnit.MILLISECONDS);
+  }
+
+  private void sendClientUpdate() {
+    requestObserver.onNext(getUpdateRequest());
+  }
+
+  private UpdateRequest getUpdateRequest() {
+    return UpdateRequest.newBuilder()
+        .setUpdateType(SCMUpdateServiceProtos.Type.CRLUpdate)
+        .setClientId(SCMUpdateClientInfo.toClientIdProto(clientUuid))
+        .setCrlUpdateRequest(getCrlUpdateRequest())
+        .build();
+  }
+
+  private CRLUpdateRequest getCrlUpdateRequest() {
+    List<Long> pendingCrlIds = clientStore.getPendingCrlIds();
+    return CRLUpdateRequest.newBuilder()
+        .setReceivedCrlId(clientStore.getLatestCrlId())
+        .addAllPendingCrlIds(pendingCrlIds)
+        .build();
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
new file mode 100644
index 0000000..0b98117
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+
+/**
+ * CRL Store interface.
+ */
+public interface CRLStore {
+
+  long getLatestCrlId();
+
+  CRLInfo getCRL(long crlId) throws IOException;
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
new file mode 100644
index 0000000..721988e
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientCRLStore.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+import java.security.cert.X509CRL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+/**
+ * In memory Client CRL store, need to integrate with Client Table.
+ */
+public class ClientCRLStore implements CRLStore {
+  private PriorityQueue<CRLInfo> pendingCrls;
+  private List<Long> revokedCerts;
+  private long localCrlId;
+
+  public ClientCRLStore() {
+    localCrlId = 0;
+    revokedCerts = new ArrayList<>();
+    pendingCrls = new PriorityQueue<>(
+        new Comparator<CRLInfo>() {
+          @Override
+          public int compare(CRLInfo o1, CRLInfo o2) {
+            return o1.getRevocationTime()
+                .compareTo(o2.getRevocationTime());
+          }
+        });
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return localCrlId;
+  }
+
+  public void setLocalCrlId(long crlId) {
+    localCrlId = crlId;
+  }
+
+
+  @Override
+  public CRLInfo getCRL(long crlId) throws IOException {
+    return null;
+  }
+
+  public void onRevokeCerts(CRLInfo crl) {
+    if (crl.shouldRevokeNow()) {
+      revokedCerts.addAll(getRevokedCertIds(crl.getX509CRL()));
+    } else {
+      pendingCrls.add(crl);
+    }
+    localCrlId = crl.getCrlSequenceID();
+  }
+
+  public List<Long> getRevokedCertIds(X509CRL crl) {
+    return Collections.unmodifiableList(crl.getRevokedCertificates().stream()
+        .map(cert->cert.getSerialNumber().longValue())
+        .collect(Collectors.toList()));
+  }
+
+  public CRLInfo getNextPendingCrl() {
+    return pendingCrls.peek();
+  }
+
+  public void removePendingCrl(CRLInfo crl) {
+    pendingCrls.remove(crl);
+    revokedCerts.addAll(getRevokedCertIds(crl.getX509CRL()));
+  }
+
+  public List<Long> getPendingCrlIds() {
+    return new ArrayList<>(pendingCrls)
+        .stream().map(crl->crl.getCrlSequenceID())
+        .collect(Collectors.toList());
+  }
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
new file mode 100644
index 0000000..c16e438
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/ClientUpdateHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+
+/**
+ * Interface used by client side to handle server updates.
+ */
+public interface ClientUpdateHandler {
+
+  void handleServerUpdate(UpdateResponse updateResponse);
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
new file mode 100644
index 0000000..84fdc1f
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+
+import java.time.Duration;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SECURITY;
+
+/**
+ * Configuration used by SCM CRL update client.
+ */
+@ConfigGroup(prefix = "ozone.scm.update")
+public class SCMUpdateClientConfiguration {
+  @Config(key = "client.crl.check.interval",
+      type = ConfigType.TIME,
+      defaultValue = "600s",
+      tags = {SCM, OZONE, SECURITY},
+      description = "The interval that the scm update service client use to" +
+          "check its pending CRLs."
+  )
+  private long clientCrlCheckIntervalInMs =
+      Duration.ofMinutes(10).toMillis();
+
+  public long getClientCrlCheckInterval() {
+    return clientCrlCheckIntervalInMs;
+  }
+
+  public void setClientCrlCheckInterval(Duration interval) {
+    this.clientCrlCheckIntervalInMs = interval.toMillis();
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
new file mode 100644
index 0000000..c69c229
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateServiceGrpcClient.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeResponse;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.Deadline;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class for SCM Update Service Grpc Client.
+ */
+public class SCMUpdateServiceGrpcClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMUpdateServiceGrpcClient.class);
+  private static final String CLIENT_NAME = "SCMUpdateServiceGrpcClient";
+
+  private ManagedChannel channel;
+  private SCMUpdateServiceGrpc.SCMUpdateServiceStub updateClient;
+  private SCMUpdateServiceGrpc.SCMUpdateServiceBlockingStub subscribeClient;
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+  private UUID clientId = null;
+  private StreamObserver<UpdateRequest> requestObserver;
+  private CRLClientUpdateHandler handler;
+  private long crlCheckInterval;
+  private final String host;
+  private final int port;
+  private final ClientCRLStore clientCRLStore;
+  private AtomicLong updateCount;
+  private AtomicLong errorCount;
+  private AtomicLong pendingCrlRemoveCount;
+
+
+  public SCMUpdateServiceGrpcClient(final String host,
+      final ConfigurationSource conf,
+      ClientCRLStore clientCRLStore) {
+    Preconditions.checkNotNull(conf);
+    this.host = host;
+    this.port = conf.getObject(UpdateServiceConfig.class).getPort();
+    this.crlCheckInterval = conf.getObject(SCMUpdateClientConfiguration.class)
+        .getClientCrlCheckInterval();
+
+    this.clientCRLStore = clientCRLStore;
+    createChannel();
+    updateCount = new AtomicLong();
+    errorCount = new AtomicLong();
+    pendingCrlRemoveCount = new AtomicLong();
+  }
+
+  public void start() {
+    if (!isRunning.compareAndSet(false, true)) {
+      LOG.info("Ignore. already started.");
+      return;
+    }
+
+    LOG.info("{}: starting...", CLIENT_NAME);
+    if (channel == null) {
+      createChannel();
+    }
+    clientId = subScribeClient();
+    assert(clientId != null);
+
+    // start background thread processing pending crl ids.
+    handler = new CRLClientUpdateHandler(clientId, updateClient,
+        this, crlCheckInterval);
+    handler.start();
+
+    LOG.info("{}: started.", CLIENT_NAME);
+  }
+
+  public void incrUpdateCount() {
+    updateCount.incrementAndGet();
+  }
+
+  public void incrErrorCount() {
+    errorCount.incrementAndGet();
+  }
+
+  public void incrPendingCrlRemoveCount() {
+    pendingCrlRemoveCount.incrementAndGet();
+  }
+
+  @VisibleForTesting
+  public long getUpdateCount() {
+    return updateCount.get();
+  }
+
+  @VisibleForTesting
+  public long getErrorCount() {
+    return errorCount.get();
+  }
+
+  @VisibleForTesting
+  public long getPendingCrlRemoveCount() {
+    return pendingCrlRemoveCount.get();
+  }
+
+  public ClientCRLStore getClientCRLStore() {
+    return clientCRLStore;
+  }
+  public AtomicBoolean getIsRunning() {
+    return isRunning;
+  }
+
+  public void stop(boolean shutdown) {
+    LOG.info("{}: stopping...", CLIENT_NAME);
+    if (isRunning.get()) {
+      // complete update request, no more client streaming
+      if (requestObserver != null) {
+        requestObserver.onCompleted();
+        requestObserver = null;
+      }
+
+      // stop update handler
+      if (handler != null) {
+        handler.stop();
+        handler = null;
+      }
+
+      if (shutdown) {
+        shutdownChannel();
+      }
+      isRunning.set(false);
+    }
+    LOG.info("{}: stopped.", CLIENT_NAME);
+  }
+
+  public void restart() {
+    resetClient();
+    stop(false);
+    start();
+  }
+
+  public void createChannel() {
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forAddress(host, port).usePlaintext()
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+    channel = channelBuilder.build();
+    updateClient = SCMUpdateServiceGrpc.newStub(channel);
+    subscribeClient = SCMUpdateServiceGrpc.newBlockingStub(channel);
+  }
+
+  public void shutdownChannel() {
+    if (channel == null) {
+      return;
+    }
+
+    channel.shutdown();
+    try {
+      channel.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to shutdown {} channel", CLIENT_NAME, e);
+    } finally {
+      channel.shutdownNow();
+      channel = null;
+    }
+  }
+
+  private UUID subScribeClient() {
+    SubscribeRequest subReq = SubscribeRequest.newBuilder().build();
+    SubscribeResponse subResp = subscribeClient.withWaitForReady()
+        .subscribe(subReq);
+    return SCMUpdateClientInfo.fromClientIdProto(subResp.getClientId());
+  }
+
+  private void unSubscribeClient() {
+    if (clientId != null) {
+      UnsubscribeRequest unsubReq = UnsubscribeRequest.newBuilder()
+            
.setClientId(SCMUpdateClientInfo.toClientIdProto(clientId)).build();
+      subscribeClient.withWaitForReady().
+          withDeadline(Deadline.after(5, TimeUnit.MILLISECONDS))
+          .unsubscribe(unsubReq);
+    }
+  }
+
+  // short-circuit the backoff timer and make them reconnect immediately.
+  private void resetClient() {
+    if (channel == null) {
+      return;
+    }
+    channel.resetConnectBackoff();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
similarity index 52%
copy from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
copy to 
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
index cce9d04..9f55c4d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java
@@ -15,30 +15,30 @@
  * the License.
  */
 
-package org.apache.hadoop.ozone.om;
+package org.apache.hadoop.hdds.scm.update.client;
 
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
 
 /**
- * Wrapper class for Scm protocol clients.
+ * Update service configuration.
  */
-public class ScmClient {
+@ConfigGroup(prefix = "ozone.scm.update.service")
+public final class UpdateServiceConfig {
 
-  private final ScmBlockLocationProtocol blockClient;
-  private final StorageContainerLocationProtocol containerClient;
+  @Config(key = "port", defaultValue = "9893", description = "Port used for"
+      + " the SCM grpc update service for CRL.", tags = {
+      ConfigTag.SECURITY})
+  private int port;
 
-  ScmClient(ScmBlockLocationProtocol blockClient,
-            StorageContainerLocationProtocol containerClient) {
-    this.containerClient = containerClient;
-    this.blockClient = blockClient;
+  public int getPort() {
+    return port;
   }
 
-  public ScmBlockLocationProtocol getBlockClient() {
-    return this.blockClient;
-  }
-
-  public StorageContainerLocationProtocol getContainerClient() {
-    return this.containerClient;
+  public UpdateServiceConfig setPort(
+      int portParam) {
+    this.port = portParam;
+    return this;
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
new file mode 100644
index 0000000..6fae144
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL client package.
+ */
+package org.apache.hadoop.hdds.scm.update.client;
\ No newline at end of file
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
new file mode 100644
index 0000000..33386db
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/CRLClientInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class wrap CRL client info on server side.
+ */
+public class CRLClientInfo  {
+  private final SCMUpdateClientInfo updateClientInfo;
+  private long receivedCrlId;
+  private List<Long> pendingCrlIds;
+
+  public CRLClientInfo(SCMUpdateClientInfo clientInfo) {
+    this.updateClientInfo = clientInfo;
+  }
+
+  public long getReceivedCrlId() {
+    return receivedCrlId;
+  }
+
+  public void setReceivedCrlId(long receivedCrlId) {
+    this.receivedCrlId = receivedCrlId;
+  }
+
+  public List<Long> getPendingCrlIds() {
+    return Collections.unmodifiableList(pendingCrlIds);
+  }
+
+  public void setPendingCrlIds(List<Long> pendingCrlIds) {
+    this.pendingCrlIds = new ArrayList<>(pendingCrlIds);
+  }
+
+  public SCMUpdateClientInfo getUpdateClientInfo() {
+    return updateClientInfo;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
new file mode 100644
index 0000000..f03ae84
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+import java.util.UUID;
+
+/**
+ * Wrapper class for scm update client on server side.
+ */
+public class SCMUpdateClientInfo {
+  private StreamObserver<UpdateResponse> responseObserver;
+  private UUID clientId;
+
+  public SCMUpdateClientInfo(UUID clientId) {
+    this(clientId, null);
+  }
+
+  public SCMUpdateClientInfo(UUID clientId,
+      StreamObserver<UpdateResponse> responseObserver) {
+    this.clientId = clientId;
+    this.responseObserver = responseObserver;
+  }
+
+  public UUID getClientId() {
+    return clientId;
+  }
+
+  public static SCMUpdateServiceProtos.ClientId toClientIdProto(UUID uuid) {
+    return SCMUpdateServiceProtos.ClientId.newBuilder()
+        .setLsb(uuid.getLeastSignificantBits())
+        .setMsb(uuid.getMostSignificantBits()).build();
+  }
+
+  public static UUID fromClientIdProto(
+      SCMUpdateServiceProtos.ClientId clientId) {
+    return new UUID(clientId.getMsb(), clientId.getLsb());
+  }
+
+  public StreamObserver<UpdateResponse> getResponseObserver() {
+    return responseObserver;
+  }
+
+  public void setResponseObserver(
+      StreamObserver<UpdateResponse> responseObserver) {
+    this.responseObserver = responseObserver;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
new file mode 100644
index 0000000..c3b2fb8
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL server package.
+ */
+package org.apache.hadoop.hdds.scm.update.server;
\ No newline at end of file
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 12ace82..396452f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -267,7 +267,7 @@ public interface CertificateClient {
   List<String> updateCAList() throws IOException;
 
   /**
-   * Get the CRLInfo based on the CRL Ids.
+   * Get the CRLInfo based on the CRL Ids from SCM.
    * @param crlIds - list of crl ids
    * @return list of CRLInfo
    * @throws IOException
@@ -275,7 +275,7 @@ public interface CertificateClient {
   List<CRLInfo> getCrls(List<Long> crlIds) throws IOException;
 
   /**
-   * Get the latest CRL id.
+   * Get the latest CRL id from SCM.
    * @return latest CRL id.
    * @throws IOException
    */
@@ -291,4 +291,26 @@ public interface CertificateClient {
           OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
     }
   }
+
+  /**
+   * Get Local CRL id received.
+   * @return
+   */
+  long getLocalCrlId();
+
+  /**
+   * Set Local CRL id.
+   * @param crlId
+   */
+  void setLocalCrlId(long crlId);
+
+  /**
+   * Process crl and remove the certificates in the revoked cert list from
+   * client.
+   * @param crl
+   * @return true if the client's own cert needs to be reinit
+   * false otherwise;
+   */
+  boolean processCrl(CRLInfo crl);
+
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 5aa6338..55b984a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -37,6 +37,7 @@ import java.security.SignatureException;
 import java.security.cert.CertStore;
 import java.security.cert.X509Certificate;
 import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -71,6 +72,7 @@ import static 
org.apache.hadoop.hdds.security.x509.exceptions.CertificateExcepti
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
 import static 
org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClientWithMaxRetry;
 
+import org.apache.ratis.util.FileUtils;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.slf4j.Logger;
 
@@ -96,6 +98,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
   private String certSerialId;
   private String caCertId;
   private String rootCaCertId;
+  private long localCrlId;
   private String component;
   private List<String> pemEncodedCACerts = null;
   private final Lock lock;
@@ -942,4 +945,66 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
       lock.unlock();
     }
   }
+
+  @Override
+  public boolean processCrl(CRLInfo crl){
+    List<String> certIds2Remove = new ArrayList();
+    crl.getX509CRL().getRevokedCertificates().forEach(
+        cert -> certIds2Remove.add(cert.getSerialNumber().toString()));
+    boolean reinitCert = removeCertificates(certIds2Remove);
+    setLocalCrlId(crl.getCrlSequenceID());
+    return reinitCert;
+  }
+
+
+  private boolean removeCertificates(List<String> certIds){
+    lock.lock();
+    boolean reInitCert = false;
+    try {
+      // For now, remove self cert and ca cert is not implemented
+      // both requires a restart of the service.
+      if ((certSerialId!=null && certIds.contains(certSerialId)) ||
+          (caCertId!=null && certIds.contains(caCertId)) ||
+          (rootCaCertId!=null && certIds.contains(rootCaCertId))) {
+        reInitCert = true;
+      }
+
+      Path basePath = securityConfig.getCertificateLocation(component);
+      for (String certId : certIds) {
+        if (certificateMap.containsKey(certId)) {
+          // remove on disk
+          String certName = String.format(CERT_FILE_NAME_FORMAT, certId);
+
+          if (certId.equals(caCertId)) {
+            certName = CA_CERT_PREFIX + certName;
+          }
+
+          if (certId.equals(rootCaCertId)) {
+            certName = ROOT_CA_CERT_PREFIX + certName;
+          }
+
+          FileUtils.deleteFileQuietly(basePath.resolve(certName).toFile());
+          // remove in memory
+          certificateMap.remove(certId);
+
+          // TODO: reset certSerialId, caCertId or rootCaCertId
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    return reInitCert;
+  }
+
+  public long getLocalCrlId() {
+    return this.localCrlId;
+  }
+
+  /**
+   * Set Local CRL id.
+   * @param crlId
+   */
+  public void setLocalCrlId(long crlId){
+    this.localCrlId = crlId;
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
index 0c7054a..d26eced 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
@@ -42,12 +42,19 @@ public class OMCertificateClient extends 
DefaultCertificateClient {
   public static final String COMPONENT_NAME = "om";
 
   public OMCertificateClient(SecurityConfig securityConfig,
-      String certSerialId) {
+      String certSerialId, String localCrlId) {
     super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
+    this.setLocalCrlId(localCrlId!=null ?
+        Long.parseLong(localCrlId): 0);
+  }
+
+  public OMCertificateClient(SecurityConfig securityConfig,
+      String certSerialId) {
+    this(securityConfig, certSerialId, null);
   }
 
   public OMCertificateClient(SecurityConfig securityConfig) {
-    super(securityConfig, LOG, null, COMPONENT_NAME);
+    this(securityConfig, null, null);
   }
 
   @Override
diff --git a/hadoop-hdds/interface-client/pom.xml 
b/hadoop-hdds/interface-client/pom.xml
index b3083ba..4794eef 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -72,6 +72,8 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
               <includes>
                 <include>DatanodeClientProtocol.proto</include>
                 <include>InterSCMProtocol.proto</include>
+                <include>SCMClientProtocol.proto</include>
+                <include>SCMUpdateProtocol.proto</include>
               </includes>
               <outputDirectory>target/generated-sources/java</outputDirectory>
               <clearOutputDirectory>false</clearOutputDirectory>
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto
new file mode 100644
index 0000000..c3e7cc0
--- /dev/null
+++ b/hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+option java_package = "org.apache.hadoop.hdds.protocol.scm.proto";
+option java_outer_classname = "SCMUpdateServiceProtos";
+option java_generate_equals_and_hash = true;
+option java_generic_services = true;
+package hadoop.hdds.scm;
+
+/**
+ * Information for Certificate Revocation List.
+ */
+message CRLInfoProto {
+  required string x509CRL = 1;
+  required uint64 creationTimestamp = 2;
+  required int64 crlSequenceID = 3;
+}
+message ClientId {
+  required int64 msb = 1;
+  required int64 lsb = 2;
+}
+
+message SubscribeRequest {
+  optional string hostname = 1;
+}
+
+message SubscribeResponse {
+  required ClientId clientId = 1;
+}
+
+enum Type {
+  CRLUpdate = 1;
+  PipelineUpdate = 2;
+ }
+
+message UpdateRequest {
+  required Type updateType = 1; // Type of the update
+  optional string traceID = 2;
+  required ClientId clientId = 3;
+  optional CRLUpdateRequest crlUpdateRequest= 4;
+}
+
+message UpdateResponse {
+  required Type updateType = 1; // Type of the update
+  optional string traceID = 2;
+  optional CRLUpdateResponse crlUpdateResponse = 3;
+}
+
+message CRLUpdateRequest {
+  required int64 receivedCrlId = 1;
+  repeated int64 pendingCrlIds = 2;
+}
+
+message CRLUpdateResponse {
+  optional CRLInfoProto crlInfo = 1;
+}
+
+message UnsubscribeRequest {
+  required ClientId clientId = 1;
+}
+
+message UnsubscribeResponse {
+}
+
+// SCM Update service streams updates to all subscribers
+service SCMUpdateService {
+
+  // Client subscribe to future SCM CRL update
+  rpc subscribe (SubscribeRequest) returns (SubscribeResponse) {};
+
+  // Client update SCM about its CRL processing status
+  rpc updateStatus (stream UpdateRequest) returns (stream UpdateResponse) {};
+
+  // Client unsubscribe to future SCM CRL update
+  rpc unsubscribe (UnsubscribeRequest) returns (UnsubscribeResponse) {};
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 7f47b5f..6d4759c 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -128,6 +128,8 @@ enum Status {
   UNKNOWN_PIPELINE_STATE = 35;
   CONTAINER_NOT_FOUND = 36;
   CONTAINER_REPLICA_NOT_FOUND = 37;
+  FAILED_TO_CONNECT_TO_CRL_SERVICE = 38;
+  FAILED_TO_ADD_CRL_CLIENT = 39;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 77bf5a7..3392454 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -42,6 +42,9 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.hadoop.hdds.scm.update.server.SCMCRLStore;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import 
org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
@@ -83,7 +86,8 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
   private final CertificateServer rootCertificateServer;
   private final CertificateServer scmCertificateServer;
   private final X509Certificate rootCACertificate;
-  private final RPC.Server rpcServer;
+  private final RPC.Server rpcServer; // HADOOP RPC SERVER
+  private final SCMUpdateServiceGrpcServer grpcUpdateServer; // gRPC SERVER
   private final InetSocketAddress rpcAddress;
   private final ProtocolMessageMetrics metrics;
   private final StorageContainerManager storageContainerManager;
@@ -124,6 +128,10 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
       rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
     }
+
+    this.grpcUpdateServer = new SCMUpdateServiceGrpcServer(
+        conf.getObject(UpdateServiceConfig.class),
+        new SCMCRLStore(scmCertificateServer));
   }
 
   /**
@@ -352,13 +360,19 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
             .collect(Collectors.toList()), CRLReason.lookup(reason),
         new Date(revocationTime));
     try {
-      return revoked.get().get().longValue();
+      Long crlId = revoked.get().get();
+      getGrpcUpdateServer().notifyCrlUpdate();
+      return crlId;
     } catch (InterruptedException | ExecutionException e) {
       throw new SCMException("Fail to revoke certs",
           SCMException.ResultCodes.FAILED_TO_REVOKE_CERTIFICATES);
     }
   }
 
+  public SCMUpdateServiceGrpcServer getGrpcUpdateServer() {
+    return grpcUpdateServer;
+  }
+
   @VisibleForTesting
   public UserGroupInformation getRpcRemoteUser() {
     return Server.getRemoteUser();
@@ -372,12 +386,13 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
     return rpcAddress;
   }
 
-  public void start() {
+  public void start() throws IOException {
     String startupMsg = StorageContainerManager.buildRpcServerStartMessage(
         "Starting RPC server for SCMSecurityProtocolServer.", getRpcAddress());
     LOGGER.info(startupMsg);
     metrics.register();
     getRpcServer().start();
+    getGrpcUpdateServer().start();
   }
 
   public void stop() {
@@ -385,6 +400,7 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
       LOGGER.info("Stopping the SCMSecurityProtocolServer.");
       metrics.unregister();
       getRpcServer().stop();
+      getGrpcUpdateServer().stop();
     } catch (Exception ex) {
       LOGGER.error("SCMSecurityProtocolServer stop failed.", ex);
     }
@@ -393,6 +409,9 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
   public void join() throws InterruptedException {
     LOGGER.trace("Join RPC server for SCMSecurityProtocolServer.");
     getRpcServer().join();
+    LOGGER.trace("Join gRPC server for SCMSecurityProtocolServer.");
+    getGrpcUpdateServer().join();
+
   }
 
   public CertificateServer getRootCertificateServer() {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
new file mode 100644
index 0000000..89775c4
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLStore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class for SCM CRL store.
+ */
+public class SCMCRLStore implements CRLStore {
+
+  private final CertificateServer certServer;
+
+  public SCMCRLStore(CertificateServer certServer) {
+    this.certServer = certServer;
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return certServer.getLatestCrlId();
+  }
+
+  @Override
+  public CRLInfo getCRL(long crlId) throws IOException {
+    List<Long> crlIdList = new ArrayList<>();
+    crlIdList.add(crlId);
+    return certServer.getCrls(crlIdList).get(0);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
new file mode 100644
index 0000000..bc92f82
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.crl.CRLCodec;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class handle the CRL client update and response.
+ */
+public class SCMCRLUpdateHandler implements SCMUpdateHandler {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMCRLUpdateHandler.class);
+  private final CRLStore crlStore;
+
+  private static final SCMUpdateServiceProtos.Type TYPE =
+      SCMUpdateServiceProtos.Type.CRLUpdate;
+
+  private final Map<UUID, CRLClientInfo> clients;
+
+  SCMCRLUpdateHandler(CRLStore crlStore) {
+    this.crlStore = crlStore;
+    clients = new ConcurrentHashMap<>();
+  }
+
+  public SCMUpdateServiceProtos.Type getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request,
+      SCMUpdateClientInfo clientInfo) {
+    SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest =
+        request.getCrlUpdateRequest();
+    long clientCrlId = updateStatusRequest.getReceivedCrlId();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client {} updateStatus \nclientCrlId {}  \npendingCrls {}",
+          clientInfo.getClientId(), clientCrlId,
+          updateStatusRequest.getPendingCrlIdsList().toString());
+    }
+
+    CRLClientInfo crlClientInfo;
+    if (!clients.containsKey(clientInfo.getClientId())) {
+      crlClientInfo = new CRLClientInfo(clientInfo);
+      clients.put(clientInfo.getClientId(), crlClientInfo);
+    } else {
+      crlClientInfo = clients.get(clientInfo.getClientId());
+    }
+
+    crlClientInfo.setPendingCrlIds(
+        request.getCrlUpdateRequest().getPendingCrlIdsList());
+    crlClientInfo.setReceivedCrlId(
+        request.getCrlUpdateRequest().getReceivedCrlId());
+
+    sendCrlUpdateToClient(crlClientInfo);
+  }
+
+  @Override
+  public void onUpdate() {
+    LOG.debug("Update due to certificate revocation");
+    // server crl id is usually > client crl id when this is invoked.
+    clients.values().forEach(client -> {
+      sendCrlUpdateToClient(client);
+    });
+  }
+
+  @Override
+  public void onRemoveClient(SCMUpdateClientInfo clientInfo) {
+    clients.remove(clientInfo.getClientId());
+  }
+
+  private void sendCrlUpdateToClient(CRLClientInfo client) {
+    long clientCrlId = client.getReceivedCrlId();
+    long serverCrlId = crlStore.getLatestCrlId();
+
+    if (clientCrlId >= serverCrlId) {
+      return;
+    }
+
+    LOG.debug("## Server: clientCrlId {} serverCrlId {}",
+        clientCrlId, serverCrlId);
+
+    long nextCrlId = clientCrlId + 1;
+    try {
+      CRLInfo crlInfo = null;
+      while (crlInfo == null && nextCrlId <= serverCrlId) {
+        crlInfo = crlStore.getCRL(nextCrlId);
+        nextCrlId++;
+      }
+      if (crlInfo == null) {
+        LOG.debug("Nothing to send to client");
+        return;
+      }
+      sendCrlToClient(crlInfo, client.getUpdateClientInfo());
+    } catch (Exception e) {
+      LOG.error("Failed to handle client update.", e);
+      
client.getUpdateClientInfo().getResponseObserver().onError(Status.INTERNAL
+          .withDescription("Failed to send crl" + nextCrlId +
+              " to client " + client.getUpdateClientInfo().getClientId())
+          .asException());
+    }
+  }
+
+  private void sendCrlToClient(CRLInfo crl, SCMUpdateClientInfo clientInfo)
+      throws SCMSecurityException {
+    LOG.debug("Sending client# {} with crl: {} ",
+        clientInfo.getClientId(), crl.getCrlSequenceID());
+    StreamObserver<UpdateResponse> responseObserver =
+        clientInfo.getResponseObserver();
+    SCMUpdateServiceProtos.CRLInfoProto crlInfoProto =
+        SCMUpdateServiceProtos.CRLInfoProto.newBuilder()
+            .setCrlSequenceID(crl.getCrlSequenceID())
+            .setX509CRL(CRLCodec.getPEMEncodedString(crl.getX509CRL()))
+            .setCreationTimestamp(crl.getCreationTimestamp()).build();
+    responseObserver.onNext(
+        UpdateResponse.newBuilder()
+            .setUpdateType(SCMUpdateServiceProtos.Type.CRLUpdate)
+            .setCrlUpdateResponse(CRLUpdateResponse.newBuilder()
+                .setCrlInfo(crlInfoProto).build()).build());
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
new file mode 100644
index 0000000..710cd20
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class that manages SCM update clients.
+ */
+public class SCMUpdateClientManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMUpdateClientManager.class);
+  private Map<UUID, SCMUpdateClientInfo> clients;
+  private Map<Type, SCMUpdateHandler> handlers;
+
+  public SCMUpdateClientManager() {
+    clients = new ConcurrentHashMap<>();
+    handlers = new ConcurrentHashMap<>();
+  }
+
+  public void registerHandler(SCMUpdateHandler handler) {
+    handlers.put(handler.getType(), handler);
+  }
+
+  public void unRegisterHandler(Type type) {
+    handlers.remove(type);
+  }
+
+  public UUID addClient() throws SCMException {
+    UUID clientId = UUID.randomUUID();
+    int retryCount = 5;
+    while (clients.containsKey(clientId)) {
+      if (retryCount > 0) {
+        clientId = UUID.randomUUID();
+        retryCount--;
+      } else {
+        throw new SCMException("Failed to add CRL client with random clientId" 
+
+            " collision", SCMException.ResultCodes.FAILED_TO_ADD_CRL_CLIENT);
+      }
+    }
+
+    SCMUpdateClientInfo clientInfo = new SCMUpdateClientInfo(clientId);
+    clients.put(clientId, clientInfo);
+    return clientId;
+  }
+
+  // this does not necessarily produce a server response via responseObserver.
+  public void handleClientUpdate(UpdateRequest request,
+      StreamObserver<UpdateResponse> responseObserver) {
+    UUID clientId = SCMUpdateClientInfo.fromClientIdProto(
+        request.getClientId());
+
+    // Unknown client update
+    if (!clients.containsKey(clientId)) {
+      responseObserver.onError(Status.INVALID_ARGUMENT
+          .withDescription("Client must subscribe before it can " +
+              "send/receive updates")
+          .asException());
+    }
+
+    // record the server to client channel
+    SCMUpdateClientInfo clientInfo = clients.get(clientId);
+    if (clientInfo.getResponseObserver() == null) {
+      clientInfo.setResponseObserver(responseObserver);
+    }
+
+    if (handlers.containsKey(request.getUpdateType())) {
+      handlers.get(request.getUpdateType())
+          .handleClientRequest(request, clientInfo);
+    } else {
+      responseObserver.onError(Status.INVALID_ARGUMENT
+          .withDescription("Unknown client update type.")
+          .asException());
+    }
+  }
+
+  /**
+   * Remove client by client Id.
+   * @param clientId - client Id
+   * @return true if client is removed, false otherwise.
+   */
+  public boolean removeClient(UUID clientId) {
+    if (clients.containsKey(clientId)) {
+      SCMUpdateClientInfo clientInfo = clients.remove(clientId);
+      handlers.values().forEach(handler -> handler.onRemoveClient(clientInfo));
+      LOG.info("Client {} removed.", clientId);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Remove client by its responseObserver obj.
+   * @param responseObserver - response observer of the client
+   * @return true if client is removed, false otherwise.
+   */
+  public boolean removeClient(StreamObserver<UpdateResponse> responseObserver) 
{
+    UUID clientId = null;
+    for (SCMUpdateClientInfo client : clients.values()) {
+      if (client.getResponseObserver() == responseObserver) {
+        clientId = client.getClientId();
+        break;
+      }
+    }
+    if (clientId != null) {
+      LOG.debug("Remove client {} by responseObserver", clientId);
+      removeClient(clientId);
+      return true;
+    }
+    LOG.debug("Remove client {} by responseObserver not found!");
+    return false;
+  }
+
+  public void onUpdate(Type type) {
+    if (handlers.containsKey(type)) {
+      handlers.get(type).onUpdate();
+    } else {
+      LOG.warn("Unknown update type to broadcast!");
+    }
+  }
+}
+
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
new file mode 100644
index 0000000..026bd4a
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+
+/**
+ * Interface used by server side to handle client update and server publish.
+ */
+public interface SCMUpdateHandler {
+
+  /**
+   * handle client update request.
+   * @param request
+   * @param clientInfo
+   */
+  void handleClientRequest(UpdateRequest request,
+      SCMUpdateClientInfo clientInfo);
+
+  /**
+   * Handle server broadcast to all clients as needed.
+   */
+  void onUpdate();
+
+  /**
+   * Handle server remove client due to error streaming to the client.
+   * @param clientInfo
+   */
+  void onRemoveClient(SCMUpdateClientInfo clientInfo);
+
+  /**
+   * Return the type of Update the handler can handle.
+   * @return the type of Update the handler can handle.
+   */
+  Type getType();
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
new file mode 100644
index 0000000..fa1b256
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * gRPC server for SCM update services.
+ */
+public class SCMUpdateServiceGrpcServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMUpdateServiceGrpcServer.class);
+
+  private static final String SERVICE_NAME = "SCMUpdateService";
+  private CRLStore crlStore;
+  private int port;
+  private Server server;
+  private SCMUpdateServiceImpl scmUpdateService;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  public SCMUpdateServiceGrpcServer(final UpdateServiceConfig updateConf,
+      final CRLStore crlStore) {
+    this.crlStore = crlStore;
+    this.port = updateConf.getPort();
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public void start() throws IOException {
+    LOG.info("{} starting", SERVICE_NAME);
+    scmUpdateService = new SCMUpdateServiceImpl(crlStore);
+    server = ServerBuilder.forPort(port).
+        addService(scmUpdateService)
+        .build();
+
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.info("Ignoring start() since {} has already started.", SERVICE_NAME);
+      return;
+    } else {
+      server.start();
+    }
+  }
+
+  public void stop() {
+    LOG.info("{} stopping", SERVICE_NAME);
+    if (isStarted.get()) {
+      scmUpdateService = null;
+      server.shutdown();
+      try {
+        server.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("failed to shutdown SCMClientGrpcServer", e);
+      } finally {
+        server.shutdownNow();
+      }
+      LOG.info("{} stopped!", SERVICE_NAME);
+      isStarted.set(false);
+    }
+  }
+
+  public void join() throws InterruptedException {
+    while (isStarted.get()) {
+      wait();
+    }
+  }
+
+  public void notifyCrlUpdate() {
+    scmUpdateService.notifyUpdate(SCMUpdateServiceProtos.Type.CRLUpdate);
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
new file mode 100644
index 0000000..f339c9c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.SubscribeResponse;
+import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeRequest;
+import 
org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UnsubscribeResponse;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Impl class for SCM update service.
+ * Allows client to subscribe and bi-directional streaming update with server.
+ * Currently used for CRL udpate between SCM and OM.
+ * Can be extended to update SCM pipeline/container change in future.
+ */
+public class SCMUpdateServiceImpl extends
+    SCMUpdateServiceGrpc.SCMUpdateServiceImplBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMUpdateServiceImpl.class);
+
+  private SCMUpdateClientManager clientManager;
+
+  public SCMUpdateServiceImpl(CRLStore crlStore) {
+    clientManager = new SCMUpdateClientManager();
+    clientManager.registerHandler(new SCMCRLUpdateHandler(crlStore));
+  }
+
+  @Override
+  public void subscribe(SubscribeRequest request,
+      StreamObserver<SubscribeResponse> responseObserver) {
+    UUID clientId;
+    try {
+      clientId  = clientManager.addClient();
+    } catch (SCMException ex) {
+      LOG.error("Fail to subscribe for Client.", ex);
+      responseObserver.onError(ex);
+      return;
+    }
+    responseObserver.onNext(SubscribeResponse.newBuilder()
+        .setClientId(SCMUpdateClientInfo.toClientIdProto(clientId))
+        .build());
+    responseObserver.onCompleted();
+    LOG.info("Client {} subscribed.", clientId);
+  }
+
+  @Override
+  public void unsubscribe(UnsubscribeRequest request,
+      StreamObserver<UnsubscribeResponse> responseObserver) {
+    UUID clientId = SCMUpdateClientInfo.fromClientIdProto(
+        request.getClientId());
+    boolean removed = clientManager.removeClient(clientId);
+    if (removed) {
+      LOG.info("Client {} unsubscribed.", clientId);
+    } else {
+      LOG.info("Client {} does not exist, no-op for unsubscribe", clientId);
+    }
+    responseObserver.onNext(UnsubscribeResponse.getDefaultInstance());
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public StreamObserver<UpdateRequest> updateStatus(
+      StreamObserver<UpdateResponse> responseObserver) {
+    return new StreamObserver<UpdateRequest>() {
+      @Override
+      public void onNext(UpdateRequest updateRequest) {
+        LOG.debug("UpdateStatus onNext");
+        clientManager.handleClientUpdate(updateRequest, responseObserver);
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        LOG.debug("UpdateStatus onError", throwable);
+        clientManager.removeClient(responseObserver);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.debug("UpdateStatus(Client) onComplete");
+        responseObserver.onCompleted();
+        clientManager.removeClient(responseObserver);
+      }
+    };
+  }
+
+  // service prepare a update response and broadcast to all clients subscribed.
+  public void notifyUpdate(Type type) {
+    clientManager.onUpdate(type);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
new file mode 100644
index 0000000..c3b2fb8
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+/**
+ * CRL server package.
+ */
+package org.apache.hadoop.hdds.scm.update.server;
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
index 86a8c3e..e1a4e62 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
@@ -26,6 +26,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.io.IOException;
+
 /**
  * Test class for {@link SCMSecurityProtocolServer}.
  * */
@@ -55,7 +57,7 @@ public class TestSCMSecurityProtocolServer {
   }
 
   @Test
-  public void testStart() {
+  public void testStart() throws IOException {
     securityProtocolServer.start();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
new file mode 100644
index 0000000..6152b39
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/MockCRLStore.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
+import org.apache.hadoop.hdds.scm.server.SCMCertStore;
+import org.apache.hadoop.hdds.scm.update.client.CRLStore;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CRLApprover;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCRLApprover;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.bouncycastle.asn1.x509.CRLReason;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.security.KeyPair;
+import java.security.cert.X509Certificate;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Mock CRL Store impl for test.
+ */
+public class MockCRLStore implements CRLStore {
+
+  private static final String COMPONENT_NAME = "scm";
+  private static final Long INITIAL_SEQUENCE_ID = 0L;
+
+  private OzoneConfiguration config;
+  private SCMMetadataStore scmMetadataStore;
+  private CertificateStore scmCertStore;
+  private SecurityConfig securityConfig;
+  private KeyPair keyPair;
+  private CRLApprover crlApprover;
+  private final X509CertificateHolder caCertificateHolder;
+  private final Logger log;
+
+  public MockCRLStore(TemporaryFolder tempDir, Logger log) throws Exception {
+
+    this.log = log;
+    config = new OzoneConfiguration();
+    config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        tempDir.newFolder().getAbsolutePath());
+
+    securityConfig = new SecurityConfig(config);
+    keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
+
+    scmMetadataStore = new SCMMetadataStoreImpl(config);
+    scmCertStore = new SCMCertStore.Builder().setRatisServer(null)
+        .setCRLSequenceId(INITIAL_SEQUENCE_ID)
+        .setMetadaStore(scmMetadataStore)
+        .build();
+    crlApprover = new DefaultCRLApprover(securityConfig,
+        keyPair.getPrivate());
+
+    Files.createDirectories(securityConfig.getKeyLocation(COMPONENT_NAME));
+    caCertificateHolder = new X509CertificateHolder(generateX509Cert()
+        .getEncoded());
+  }
+
+  public BigInteger issueCert() throws Exception {
+    X509Certificate cert = generateX509Cert();
+    scmCertStore.storeValidCertificate(cert.getSerialNumber(), cert,
+        HddsProtos.NodeType.SCM);
+    return cert.getSerialNumber();
+  }
+
+  public Optional<Long> revokeCert(List<BigInteger> certs,
+                                   Instant revokeTime) throws IOException {
+    log.debug("Revoke certs: ", certs.toString());
+    Optional<Long> crlId = scmCertStore.revokeCertificates(certs,
+        caCertificateHolder,
+        CRLReason.lookup(CRLReason.keyCompromise),
+        Date.from(revokeTime), crlApprover);
+    List<CRLInfo> crlInfos =
+        scmCertStore.getCrls(ImmutableList.of(crlId.get()));
+
+    if (crlInfos.isEmpty()) {
+      log.debug("CRL[0]: {}", crlInfos.get(0).toString());
+    }
+    return crlId;
+  }
+
+
+  private X509Certificate generateX509Cert() throws Exception {
+    return CertificateCodec.getX509Certificate(
+        CertificateCodec.getPEMEncodedString(
+            KeyStoreTestUtil.generateCertificate("CN=Test", keyPair, 30,
+                "SHA256withRSA")));
+  }
+
+  @Override
+  public long getLatestCrlId() {
+    return scmCertStore.getLatestCrlId();
+  }
+
+  @Override
+  public CRLInfo getCRL(long crlId) throws IOException {
+    return scmCertStore.getCrls(Arrays.asList(crlId)).get(0);
+  }
+
+  public void close() throws Exception {
+    if (scmMetadataStore.getStore() != null) {
+      scmMetadataStore.getStore().close();
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
new file mode 100644
index 0000000..941f629
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/update/server/TestSCMUpdateServiceGrpcServer.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.update.server;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.update.client.CRLClientUpdateHandler;
+import org.apache.hadoop.hdds.scm.update.client.ClientCRLStore;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateClientConfiguration;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
+import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Tests for SCM update Service.
+ */
+public class TestSCMUpdateServiceGrpcServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSCMUpdateServiceGrpcServer.class);
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  @Rule
+  public ExpectedException thrown= ExpectedException.none();
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  private MockCRLStore mockCRLStore;
+
+  @Before
+  public void setUp() throws Exception {
+    mockCRLStore = new MockCRLStore(tempDir, LOG);
+    GenericTestUtils.setLogLevel(CRLClientUpdateHandler.getLog(), Level.DEBUG);
+  }
+
+  @After
+  public void destroyDbStore() throws Exception {
+    if (mockCRLStore != null) {
+      mockCRLStore.close();
+      mockCRLStore = null;
+    }
+  }
+
+  private UpdateServiceConfig getUpdateServiceConfig(OzoneConfiguration conf) {
+    return conf.getObject(UpdateServiceConfig.class);
+  }
+
+  @Test
+  public void testStartStop() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+        getUpdateServiceConfig(conf), mockCRLStore);
+    ClientCRLStore clientCRLStore = new ClientCRLStore();
+    SCMUpdateServiceGrpcClient client =
+        new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+
+    try {
+      server.start();
+      client.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      // client need to handle the case when the server is stopped first.
+      client.stop(true);
+      server.stop();
+    }
+  }
+
+
+  @Test
+  public void testClientUpdateWithRevoke() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+        getUpdateServiceConfig(conf), mockCRLStore);
+    ClientCRLStore clientCRLStore = new ClientCRLStore();
+    SCMUpdateServiceGrpcClient client =
+        new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+    server.start();
+    client.start();
+
+    try {
+      // issue 10 certs
+      List<BigInteger> certIds = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        BigInteger certId = mockCRLStore.issueCert();
+        certIds.add(certId);
+      }
+
+      // revoke 4 certs and broadcast
+      for (int i = 0; i < 4; i++) {
+        revokeCertNow((certIds.get(i)));
+      }
+      server.notifyCrlUpdate();
+
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()==4, 100, 2000);
+      Assert.assertEquals(4, client.getUpdateCount());
+      Assert.assertEquals(0, client.getErrorCount());
+
+      revokeCertNow(certIds.get(5));
+      server.notifyCrlUpdate();
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()>4, 100, 2000);
+      Assert.assertEquals(5, client.getUpdateCount());
+      Assert.assertEquals(0, client.getErrorCount());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      client.stop(true);
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testClientUpdateWithDelayedRevoke() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+        getUpdateServiceConfig(conf), mockCRLStore);
+
+    ClientCRLStore clientCRLStore = new ClientCRLStore();
+
+    // check pending crl every 5 seconds
+    SCMUpdateClientConfiguration updateClientConfiguration =
+        conf.getObject(SCMUpdateClientConfiguration.class);
+    updateClientConfiguration.setClientCrlCheckInterval(Duration.ofSeconds(2));
+    conf.setFromObject(updateClientConfiguration);
+
+    SCMUpdateServiceGrpcClient client =
+        new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+    server.start();
+    client.start();
+
+    try {
+      // issue 10 certs
+      List<BigInteger> certIds = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        BigInteger certId = mockCRLStore.issueCert();
+        certIds.add(certId);
+      }
+
+      // revoke cert 0
+      revokeCertNow((certIds.get(0)));
+      server.notifyCrlUpdate();
+
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()==1,
+          100, 2000);
+      Assert.assertEquals(1, client.getUpdateCount());
+      Assert.assertEquals(0, client.getErrorCount());
+
+      // revoke cert 5 with 10 seconds delay
+      revokeCert(certIds.get(5), Instant.now().plus(Duration.ofSeconds(5)));
+      server.notifyCrlUpdate();
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()>1,
+          100, 2000);
+      Assert.assertEquals(2, client.getUpdateCount());
+      Assert.assertEquals(0, client.getErrorCount());
+      Assert.assertEquals(1, client.getClientCRLStore()
+          .getPendingCrlIds().size());
+
+      GenericTestUtils.waitFor(() -> client.getPendingCrlRemoveCount()==1,
+          100, 20_000);
+      Assert.assertTrue(client.getClientCRLStore()
+          .getPendingCrlIds().isEmpty());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      client.stop(true);
+      server.stop();
+    }
+  }
+
+  private Long revokeCert(BigInteger certId, Instant revokeTime)
+      throws IOException {
+    Optional<Long> crlId =
+        mockCRLStore.revokeCert(Arrays.asList(certId), revokeTime);
+    return crlId.get();
+  }
+
+  private Long revokeCertNow(BigInteger certId) throws IOException {
+    Optional<Long> crlId =
+        mockCRLStore.revokeCert(Arrays.asList(certId), Instant.now());
+    return crlId.get();
+  }
+
+  @Test
+  public void testClientUpdateWithRestart() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMUpdateServiceGrpcServer server = new SCMUpdateServiceGrpcServer(
+        getUpdateServiceConfig(conf), mockCRLStore);
+    ClientCRLStore clientCRLStore = new ClientCRLStore();
+    SCMUpdateServiceGrpcClient client =
+        new SCMUpdateServiceGrpcClient("localhost", conf, clientCRLStore);
+    server.start();
+    client.start();
+
+    try {
+      // issue 10 certs
+      List<BigInteger> certIds = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        BigInteger certId = mockCRLStore.issueCert();
+        certIds.add(certId);
+      }
+
+      // revoke 4 certs and broadcast
+      for (int i = 0; i < 4; i++) {
+        revokeCertNow((certIds.get(i)));
+      }
+      server.notifyCrlUpdate();
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()==4,
+          100, 2000);
+      Assert.assertEquals(4, client.getUpdateCount());
+
+
+      // server restart
+      // client onError->
+      // 1. reconnect
+      // 2. new subscribe resumes from previous state
+      LOG.info("Test server restart begin.");
+      // server shutdown can lead to duplicate message received on client when
+      // client retry connect to the server. The client will handle that.
+      server.stop();
+      server.start();
+      GenericTestUtils.waitFor(() -> client.getErrorCount()==1,
+          100, 2000);
+      Assert.assertEquals(4, client.getUpdateCount());
+      Assert.assertEquals(1, client.getErrorCount());
+      Assert.assertEquals(4, clientCRLStore.getLatestCrlId());
+      LOG.info("Test server restart end.");
+
+      revokeCertNow(certIds.get(5));
+      server.notifyCrlUpdate();
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()>4,
+          100, 5000);
+      Assert.assertEquals(5, client.getUpdateCount());
+      Assert.assertEquals(1, client.getErrorCount());
+      Assert.assertEquals(5, clientCRLStore.getLatestCrlId());
+
+      // client restart
+      // server onError->
+      // 1. remove stale client
+      // 2. new subscribe resumes from previous state.
+      LOG.info("Test client restart begin.");
+      // a full client channel shutdown and create
+      client.stop(true);
+      client.createChannel();
+      client.start();
+      Assert.assertEquals(5, clientCRLStore.getLatestCrlId());
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()>4,
+          100, 2000);
+      revokeCertNow(certIds.get(6));
+      // mostly noop
+      server.notifyCrlUpdate();
+      LOG.info("Test client restart end.");
+
+      GenericTestUtils.waitFor(() -> client.getUpdateCount()>5,
+          100, 2000);
+      Assert.assertTrue(client.getUpdateCount()>=6);
+      Assert.assertEquals(2, client.getErrorCount());
+      Assert.assertEquals(6, clientCRLStore.getLatestCrlId());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      client.stop(true);
+      server.stop();
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 6bac924..b37f785 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -225,4 +225,18 @@ public class CertificateClientTestImpl implements 
CertificateClient {
     return 0;
   }
 
+  @Override
+  public long getLocalCrlId() {
+    return 0;
+  }
+
+  @Override
+  public void setLocalCrlId(long crlId) {
+  }
+
+  @Override
+  public boolean processCrl(CRLInfo crl) {
+    return false;
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
index cce9d04..a3c2fd7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
 
 /**
  * Wrapper class for Scm protocol clients.
@@ -27,6 +28,7 @@ public class ScmClient {
 
   private final ScmBlockLocationProtocol blockClient;
   private final StorageContainerLocationProtocol containerClient;
+  private SCMUpdateServiceGrpcClient updateServiceGrpcClient;
 
   ScmClient(ScmBlockLocationProtocol blockClient,
             StorageContainerLocationProtocol containerClient) {
@@ -41,4 +43,13 @@ public class ScmClient {
   public StorageContainerLocationProtocol getContainerClient() {
     return this.containerClient;
   }
+
+  public void setUpdateServiceGrpcClient(
+      SCMUpdateServiceGrpcClient updateClient) {
+    this.updateServiceGrpcClient = updateClient;
+  }
+
+  public SCMUpdateServiceGrpcClient getUpdateServiceGrpcClient() {
+    return updateServiceGrpcClient;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
index 4c68ed6..086d715 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
-import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to