vivekratnavel commented on a change in pull request #2216: URL: https://github.com/apache/ozone/pull/2216#discussion_r641019737
########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CRL client update handler that handles local CRL update and pending CRLs. + */ +public class CRLClientUpdateHandler implements ClientUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + CRLClientUpdateHandler.class); + private static final String NAME = "CRLClientUpdateHandler"; + + private final SCMUpdateServiceStub updateStub; + private final ClientCRLStore clientStore; + + // Used to update server about local pending crl id list + private StreamObserver<UpdateRequest> requestObserver; + private UUID clientUuid; + private SCMUpdateServiceProtos.ClientId clientIdProto; + + // periodically process pending crls + private ScheduledExecutorService executorService; + private final SCMUpdateServiceGrpcClient serviceGrpcClient; + private long crlCheckInterval; + + CRLClientUpdateHandler(UUID clientId, + SCMUpdateServiceStub updateStub, + SCMUpdateServiceGrpcClient serviceGrpcClient, + long crlCheckInterval) { + this.clientUuid = clientId; + this.updateStub = updateStub; + this.serviceGrpcClient = serviceGrpcClient; + + this.clientStore = serviceGrpcClient.getClientCRLStore(); + this.crlCheckInterval = crlCheckInterval; + LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000); + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("CRLUpdateHandler Thread - %d").build()); + } + + public static Logger getLog() { + return LOG; + } + + @Override + public void handleServerUpdate(UpdateResponse updateResponse) { + SCMUpdateServiceProtos.CRLInfoProto crlIfo = Review comment: nit: typo in crlIfo ```suggestion SCMUpdateServiceProtos.CRLInfoProto crlInfo = ``` ########## File path: hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto ########## @@ -0,0 +1,102 @@ +/** Review comment: nit: ```suggestion /* ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class SCMUpdateClientManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateClientManager.class); + private Map<UUID, SCMUpdateClientInfo> clients; + private Map<Type, SCMUpdateHandler> handlers; + + public SCMUpdateClientManager() { + clients = new ConcurrentHashMap<>(); + handlers = new ConcurrentHashMap<>(); + } + + public void registerHandler(SCMUpdateHandler handler) { + handlers.put(handler.getType(), handler); + } + + public void unRegisterHandler(Type type) { + handlers.remove(type); + } + + public UUID addClient() { + UUID clientId = UUID.randomUUID(); + SCMUpdateClientInfo clientInfo = new SCMUpdateClientInfo(clientId); + clients.put(clientId, clientInfo); + return clientId; + } + + // this does not necessary produce a server response via responseObserver. + public void handleClientUpdate(UpdateRequest request, + StreamObserver<UpdateResponse> responseObserver) { + UUID clientId = SCMUpdateClientInfo.fromClientIdProto( + request.getClientId()); + + // Unknown client update + if (!clients.containsKey(clientId)) { + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("Client must subscribe before send/receive update") Review comment: ```suggestion .withDescription("Client must subscribe before it can send/receive updates") ``` ########## File path: .github/workflows/post-commit.yml ########## @@ -224,7 +224,7 @@ jobs: continue-on-error: true integration: runs-on: ubuntu-18.04 - timeout-minutes: 120 + timeout-minutes: 150 Review comment: Are we sure we want to increase to 150? ########## File path: hadoop-hdds/interface-client/src/main/proto/SCMUpdateProtocol.proto ########## @@ -0,0 +1,102 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +/** + * These .proto interfaces are private and unstable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *unstable* .proto interface. + */ +syntax = "proto2"; +option java_package = "org.apache.hadoop.hdds.protocol.scm.proto"; +option java_outer_classname = "SCMUpdateServiceProtos"; +option java_generate_equals_and_hash = true; +option java_generic_services = true; +package hadoop.hdds.scm; + +/** + * Information for Certificate Revocation List. + */ +message CRLInfoProto { + required string x509CRL = 1; + required uint64 creationTimestamp = 2; + required int64 crlSequenceID = 3; +} +message ClientId { + required int64 msb = 1; + required int64 lsb = 2; +} + +message SubscribeRequest { + optional string hostname = 1; +} + +message SubscribeResponse { + required ClientId clientId = 1; +} + +enum Type { + CRLUpdate = 1; + PipelineUpdate = 2; + } + +message UpdateRequest { + required Type updateType = 1; // Type of the update + + optional string traceID = 2; + required ClientId clientId = 3; + + optional CRLUpdateRequest crlUpdateRequest= 4; +} + +message UpdateResponse { + required Type updateType = 1; // Type of the update + + optional string traceID = 2; + + optional CRLUpdateResponse crlUpdateResponse = 3; +} + +message CRLUpdateRequest { + required int64 receivedCrlId = 2; + repeated int64 pendingCrlIds = 3; Review comment: ```suggestion required int64 receivedCrlId = 1; repeated int64 pendingCrlIds = 2; ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.x509.crl.CRLCodec; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class handle the CRL client update and response. + */ +public class SCMCRLUpdateHandler implements SCMUpdateHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SCMCRLUpdateHandler.class); + private final CRLStore crlStore; + + private static final SCMUpdateServiceProtos.Type TYPE = + SCMUpdateServiceProtos.Type.CRLUpdate; + + private final Map<UUID, CRLClientInfo> clients; + + SCMCRLUpdateHandler(CRLStore crlStore) { + this.crlStore = crlStore; + clients = new ConcurrentHashMap<>(); + } + + public SCMUpdateServiceProtos.Type getType() { + return TYPE; + } + + @Override + public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request, + SCMUpdateClientInfo clientInfo) { + SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest = + request.getCrlUpdateRequest(); + long clientCrlId = updateStatusRequest.getReceivedCrlId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}", + clientInfo.getClientId(), clientCrlId, + updateStatusRequest.getPendingCrlIdsList().toString()); + } + + CRLClientInfo crlClientInfo; + if (!clients.containsKey(clientInfo.getClientId())) { + crlClientInfo = new CRLClientInfo(clientInfo); + clients.put(clientInfo.getClientId(), crlClientInfo); + } else { + crlClientInfo = clients.get(clientInfo.getClientId()); + } + + crlClientInfo.setPendingCrlIds( + request.getCrlUpdateRequest().getPendingCrlIdsList()); + crlClientInfo.setReceivedCrlId( + request.getCrlUpdateRequest().getReceivedCrlId()); + + sendCrlUpdateToClient(crlClientInfo); + } + + @Override + public void onUpdate() { + LOG.debug("Update due to certificate revoke"); + // server crl id is usually > client crl id when this is invoked. + clients.values().forEach(client -> { + sendCrlUpdateToClient(client); + }); + } + + @Override + public void onRemoveClient(SCMUpdateClientInfo clientInfo) { + clients.remove(clientInfo.getClientId()); + } + + private void sendCrlUpdateToClient(CRLClientInfo client) { + long clientCrlId = client.getReceivedCrlId(); + long serverCrlId = crlStore.getLatestCrlId(); + + if (clientCrlId >= serverCrlId) { + return; + } + + LOG.debug("## Server: clientCrlId {} serverCrlId {}", Review comment: Should this be ```suggestion LOG.debug("## clientCrlId: clientCrlId {} serverCrlId {}", ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.x509.crl.CRLCodec; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class handle the CRL client update and response. + */ +public class SCMCRLUpdateHandler implements SCMUpdateHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SCMCRLUpdateHandler.class); + private final CRLStore crlStore; + + private static final SCMUpdateServiceProtos.Type TYPE = + SCMUpdateServiceProtos.Type.CRLUpdate; + + private final Map<UUID, CRLClientInfo> clients; + + SCMCRLUpdateHandler(CRLStore crlStore) { + this.crlStore = crlStore; + clients = new ConcurrentHashMap<>(); + } + + public SCMUpdateServiceProtos.Type getType() { + return TYPE; + } + + @Override + public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request, + SCMUpdateClientInfo clientInfo) { + SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest = + request.getCrlUpdateRequest(); + long clientCrlId = updateStatusRequest.getReceivedCrlId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}", + clientInfo.getClientId(), clientCrlId, + updateStatusRequest.getPendingCrlIdsList().toString()); + } + + CRLClientInfo crlClientInfo; + if (!clients.containsKey(clientInfo.getClientId())) { + crlClientInfo = new CRLClientInfo(clientInfo); + clients.put(clientInfo.getClientId(), crlClientInfo); + } else { + crlClientInfo = clients.get(clientInfo.getClientId()); + } + + crlClientInfo.setPendingCrlIds( + request.getCrlUpdateRequest().getPendingCrlIdsList()); + crlClientInfo.setReceivedCrlId( + request.getCrlUpdateRequest().getReceivedCrlId()); + + sendCrlUpdateToClient(crlClientInfo); + } + + @Override + public void onUpdate() { + LOG.debug("Update due to certificate revoke"); + // server crl id is usually > client crl id when this is invoked. + clients.values().forEach(client -> { + sendCrlUpdateToClient(client); + }); + } + + @Override + public void onRemoveClient(SCMUpdateClientInfo clientInfo) { + clients.remove(clientInfo.getClientId()); + } + + private void sendCrlUpdateToClient(CRLClientInfo client) { + long clientCrlId = client.getReceivedCrlId(); + long serverCrlId = crlStore.getLatestCrlId(); + + if (clientCrlId >= serverCrlId) { + return; + } + + LOG.debug("## Server: clientCrlId {} serverCrlId {}", + clientCrlId, serverCrlId); + + long nextCrlId = clientCrlId + 1; + try { + CRLInfo crlInfo = null; + while (crlInfo == null && nextCrlId <= serverCrlId) { + crlInfo = crlStore.getCRL(nextCrlId); + nextCrlId++; + } + if (crlInfo == null) { + LOG.debug("Nothing to send to client"); + return; + } + sendCrlToClient(crlInfo, client.getUpdateClientInfo()); + } catch (Exception e) { + LOG.error("Fail handling client update.", e); + client.getUpdateClientInfo().getResponseObserver().onError(Status.INTERNAL + .withDescription("Fail to send crl" + nextCrlId + Review comment: nit: ```suggestion .withDescription("Failed to send crl" + nextCrlId + ``` ########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/SCMUpdateClientConfiguration.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigType; + +import java.time.Duration; + +import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; +import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; +import static org.apache.hadoop.hdds.conf.ConfigTag.SECURITY; + +/** + * Configuration used by SCM CRL update client. + */ +@ConfigGroup(prefix = "ozone.scm.update") +public class SCMUpdateClientConfiguration { + @Config(key = "client.crl.check.interval", + type = ConfigType.TIME, + defaultValue = "600s", + tags = {SCM, OZONE, SECURITY}, + description = "The interval that the scm update service client use to" + + "check its pending revoked crl." Review comment: nit: ```suggestion description = "The interval that the scm update service client uses to" + "check its pending CRLs." ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.x509.crl.CRLCodec; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class handle the CRL client update and response. + */ +public class SCMCRLUpdateHandler implements SCMUpdateHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SCMCRLUpdateHandler.class); + private final CRLStore crlStore; + + private static final SCMUpdateServiceProtos.Type TYPE = + SCMUpdateServiceProtos.Type.CRLUpdate; + + private final Map<UUID, CRLClientInfo> clients; + + SCMCRLUpdateHandler(CRLStore crlStore) { + this.crlStore = crlStore; + clients = new ConcurrentHashMap<>(); + } + + public SCMUpdateServiceProtos.Type getType() { + return TYPE; + } + + @Override + public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request, + SCMUpdateClientInfo clientInfo) { + SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest = + request.getCrlUpdateRequest(); + long clientCrlId = updateStatusRequest.getReceivedCrlId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}", + clientInfo.getClientId(), clientCrlId, + updateStatusRequest.getPendingCrlIdsList().toString()); + } + + CRLClientInfo crlClientInfo; + if (!clients.containsKey(clientInfo.getClientId())) { + crlClientInfo = new CRLClientInfo(clientInfo); + clients.put(clientInfo.getClientId(), crlClientInfo); + } else { + crlClientInfo = clients.get(clientInfo.getClientId()); + } + + crlClientInfo.setPendingCrlIds( + request.getCrlUpdateRequest().getPendingCrlIdsList()); + crlClientInfo.setReceivedCrlId( + request.getCrlUpdateRequest().getReceivedCrlId()); + + sendCrlUpdateToClient(crlClientInfo); + } + + @Override + public void onUpdate() { + LOG.debug("Update due to certificate revoke"); Review comment: nit: ```suggestion LOG.debug("Update due to certificate revocation"); ``` ########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CRL client update handler that handles local CRL update and pending CRLs. + */ +public class CRLClientUpdateHandler implements ClientUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + CRLClientUpdateHandler.class); + private static final String NAME = "CRLClientUpdateHandler"; + + private final SCMUpdateServiceStub updateStub; + private final ClientCRLStore clientStore; + + // Used to update server about local pending crl id list + private StreamObserver<UpdateRequest> requestObserver; + private UUID clientUuid; + private SCMUpdateServiceProtos.ClientId clientIdProto; + + // periodically process pending crls + private ScheduledExecutorService executorService; + private final SCMUpdateServiceGrpcClient serviceGrpcClient; + private long crlCheckInterval; + + CRLClientUpdateHandler(UUID clientId, + SCMUpdateServiceStub updateStub, + SCMUpdateServiceGrpcClient serviceGrpcClient, + long crlCheckInterval) { + this.clientUuid = clientId; + this.updateStub = updateStub; + this.serviceGrpcClient = serviceGrpcClient; + + this.clientStore = serviceGrpcClient.getClientCRLStore(); + this.crlCheckInterval = crlCheckInterval; + LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000); + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("CRLUpdateHandler Thread - %d").build()); + } + + public static Logger getLog() { + return LOG; + } + + @Override + public void handleServerUpdate(UpdateResponse updateResponse) { + SCMUpdateServiceProtos.CRLInfoProto crlIfo = + updateResponse.getCrlUpdateResponse().getCrlInfo(); + + long receivedCrlId = crlIfo.getCrlSequenceID(); + long localCrlId = clientStore.getLatestCrlId(); + + LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}", + clientUuid, localCrlId, receivedCrlId); + if (localCrlId == receivedCrlId) { + return; + } + // send a client update to refresh stale server + if (localCrlId > receivedCrlId) { + LOG.warn("Receive stale crlId {} lower than client crlId {}", + receivedCrlId, localCrlId); + sendClientUpdate(); + return; + } + + CRLInfo crl; + try { + crl = CRLInfo.fromCRLProto3(crlIfo); + } catch (Exception e) { + LOG.error("Can't parse server CRL update, skip...", e); + return; + } + clientStore.onRevokeCerts(crl); + // send client update. + sendClientUpdate(); + } + + public void start() { + // send initial update request to get a request observer handle + UpdateRequest updateReq = getUpdateRequest(); + requestObserver = updateStub.withWaitForReady() + .updateStatus(new StreamObserver<UpdateResponse>() { + @Override + public void onNext(UpdateResponse updateResponse) { + LOG.debug("Receive server response: {}", updateResponse.toString()); + serviceGrpcClient.incrUpdateCount(); + handleServerUpdate(updateResponse); + } + + @Override + public void onError(Throwable throwable) { + LOG.debug("Receive server error ", throwable); + serviceGrpcClient.incrErrorCount(); + if (serviceGrpcClient.getIsRunning().get()) { + // TODO: not all server error needs client restart. + LOG.warn("Restart client on server error: ", throwable); + serviceGrpcClient.restart(); + } + } + + @Override + public void onCompleted() { + LOG.debug("Receive server complete"); Review comment: nit: "Server response completed" ########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CRL client update handler that handles local CRL update and pending CRLs. + */ +public class CRLClientUpdateHandler implements ClientUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + CRLClientUpdateHandler.class); + private static final String NAME = "CRLClientUpdateHandler"; + + private final SCMUpdateServiceStub updateStub; + private final ClientCRLStore clientStore; + + // Used to update server about local pending crl id list + private StreamObserver<UpdateRequest> requestObserver; + private UUID clientUuid; + private SCMUpdateServiceProtos.ClientId clientIdProto; + + // periodically process pending crls + private ScheduledExecutorService executorService; + private final SCMUpdateServiceGrpcClient serviceGrpcClient; + private long crlCheckInterval; + + CRLClientUpdateHandler(UUID clientId, + SCMUpdateServiceStub updateStub, + SCMUpdateServiceGrpcClient serviceGrpcClient, + long crlCheckInterval) { + this.clientUuid = clientId; + this.updateStub = updateStub; + this.serviceGrpcClient = serviceGrpcClient; + + this.clientStore = serviceGrpcClient.getClientCRLStore(); + this.crlCheckInterval = crlCheckInterval; + LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000); + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("CRLUpdateHandler Thread - %d").build()); + } + + public static Logger getLog() { + return LOG; + } + + @Override + public void handleServerUpdate(UpdateResponse updateResponse) { + SCMUpdateServiceProtos.CRLInfoProto crlIfo = + updateResponse.getCrlUpdateResponse().getCrlInfo(); + + long receivedCrlId = crlIfo.getCrlSequenceID(); + long localCrlId = clientStore.getLatestCrlId(); + + LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}", + clientUuid, localCrlId, receivedCrlId); + if (localCrlId == receivedCrlId) { + return; + } + // send a client update to refresh stale server + if (localCrlId > receivedCrlId) { + LOG.warn("Receive stale crlId {} lower than client crlId {}", + receivedCrlId, localCrlId); + sendClientUpdate(); + return; + } + + CRLInfo crl; + try { + crl = CRLInfo.fromCRLProto3(crlIfo); + } catch (Exception e) { + LOG.error("Can't parse server CRL update, skip...", e); + return; + } + clientStore.onRevokeCerts(crl); + // send client update. + sendClientUpdate(); + } + + public void start() { + // send initial update request to get a request observer handle + UpdateRequest updateReq = getUpdateRequest(); + requestObserver = updateStub.withWaitForReady() + .updateStatus(new StreamObserver<UpdateResponse>() { + @Override + public void onNext(UpdateResponse updateResponse) { + LOG.debug("Receive server response: {}", updateResponse.toString()); + serviceGrpcClient.incrUpdateCount(); + handleServerUpdate(updateResponse); + } + + @Override + public void onError(Throwable throwable) { + LOG.debug("Receive server error ", throwable); + serviceGrpcClient.incrErrorCount(); + if (serviceGrpcClient.getIsRunning().get()) { + // TODO: not all server error needs client restart. + LOG.warn("Restart client on server error: ", throwable); + serviceGrpcClient.restart(); + } + } + + @Override + public void onCompleted() { + LOG.debug("Receive server complete"); + } + }); + requestObserver.onNext(updateReq); + startPendingCrlChecker(); + } + + public void stop() { + stopPendingCrlCheck(); + } + + private void stopPendingCrlCheck() { + executorService.shutdown(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Unexpected exception while waiting executor service" + + " shutdown", e); Review comment: nit: ```suggestion LOG.error("Unexpected exception while waiting for executor service" + " to shutdown", e); ``` ########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CRL client update handler that handles local CRL update and pending CRLs. + */ +public class CRLClientUpdateHandler implements ClientUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + CRLClientUpdateHandler.class); + private static final String NAME = "CRLClientUpdateHandler"; + + private final SCMUpdateServiceStub updateStub; + private final ClientCRLStore clientStore; + + // Used to update server about local pending crl id list + private StreamObserver<UpdateRequest> requestObserver; + private UUID clientUuid; + private SCMUpdateServiceProtos.ClientId clientIdProto; + + // periodically process pending crls + private ScheduledExecutorService executorService; + private final SCMUpdateServiceGrpcClient serviceGrpcClient; + private long crlCheckInterval; + + CRLClientUpdateHandler(UUID clientId, + SCMUpdateServiceStub updateStub, + SCMUpdateServiceGrpcClient serviceGrpcClient, + long crlCheckInterval) { + this.clientUuid = clientId; + this.updateStub = updateStub; + this.serviceGrpcClient = serviceGrpcClient; + + this.clientStore = serviceGrpcClient.getClientCRLStore(); + this.crlCheckInterval = crlCheckInterval; + LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000); + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("CRLUpdateHandler Thread - %d").build()); + } + + public static Logger getLog() { + return LOG; + } + + @Override + public void handleServerUpdate(UpdateResponse updateResponse) { + SCMUpdateServiceProtos.CRLInfoProto crlIfo = + updateResponse.getCrlUpdateResponse().getCrlInfo(); + + long receivedCrlId = crlIfo.getCrlSequenceID(); + long localCrlId = clientStore.getLatestCrlId(); + + LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}", + clientUuid, localCrlId, receivedCrlId); + if (localCrlId == receivedCrlId) { + return; + } + // send a client update to refresh stale server + if (localCrlId > receivedCrlId) { + LOG.warn("Receive stale crlId {} lower than client crlId {}", Review comment: nit ```suggestion LOG.warn("Received stale crlId {} lower than client crlId {}", ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.x509.crl.CRLCodec; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class handle the CRL client update and response. + */ +public class SCMCRLUpdateHandler implements SCMUpdateHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SCMCRLUpdateHandler.class); + private final CRLStore crlStore; + + private static final SCMUpdateServiceProtos.Type TYPE = + SCMUpdateServiceProtos.Type.CRLUpdate; + + private final Map<UUID, CRLClientInfo> clients; + + SCMCRLUpdateHandler(CRLStore crlStore) { + this.crlStore = crlStore; + clients = new ConcurrentHashMap<>(); + } + + public SCMUpdateServiceProtos.Type getType() { + return TYPE; + } + + @Override + public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request, + SCMUpdateClientInfo clientInfo) { + SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest = + request.getCrlUpdateRequest(); + long clientCrlId = updateStatusRequest.getReceivedCrlId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}", + clientInfo.getClientId(), clientCrlId, + updateStatusRequest.getPendingCrlIdsList().toString()); + } + + CRLClientInfo crlClientInfo; + if (!clients.containsKey(clientInfo.getClientId())) { + crlClientInfo = new CRLClientInfo(clientInfo); + clients.put(clientInfo.getClientId(), crlClientInfo); + } else { + crlClientInfo = clients.get(clientInfo.getClientId()); + } + + crlClientInfo.setPendingCrlIds( + request.getCrlUpdateRequest().getPendingCrlIdsList()); + crlClientInfo.setReceivedCrlId( + request.getCrlUpdateRequest().getReceivedCrlId()); + + sendCrlUpdateToClient(crlClientInfo); + } + + @Override + public void onUpdate() { + LOG.debug("Update due to certificate revoke"); + // server crl id is usually > client crl id when this is invoked. + clients.values().forEach(client -> { + sendCrlUpdateToClient(client); + }); + } + + @Override + public void onRemoveClient(SCMUpdateClientInfo clientInfo) { + clients.remove(clientInfo.getClientId()); + } + + private void sendCrlUpdateToClient(CRLClientInfo client) { + long clientCrlId = client.getReceivedCrlId(); + long serverCrlId = crlStore.getLatestCrlId(); + + if (clientCrlId >= serverCrlId) { + return; + } + + LOG.debug("## Server: clientCrlId {} serverCrlId {}", + clientCrlId, serverCrlId); + + long nextCrlId = clientCrlId + 1; + try { + CRLInfo crlInfo = null; + while (crlInfo == null && nextCrlId <= serverCrlId) { + crlInfo = crlStore.getCRL(nextCrlId); + nextCrlId++; + } + if (crlInfo == null) { + LOG.debug("Nothing to send to client"); + return; + } + sendCrlToClient(crlInfo, client.getUpdateClientInfo()); + } catch (Exception e) { + LOG.error("Fail handling client update.", e); Review comment: nit: ```suggestion LOG.error("Failed to handle client update.", e); ``` ########## File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/CRLClientUpdateHandler.java ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceGrpc.SCMUpdateServiceStub; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.server.SCMUpdateClientInfo; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * CRL client update handler that handles local CRL update and pending CRLs. + */ +public class CRLClientUpdateHandler implements ClientUpdateHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + CRLClientUpdateHandler.class); + private static final String NAME = "CRLClientUpdateHandler"; + + private final SCMUpdateServiceStub updateStub; + private final ClientCRLStore clientStore; + + // Used to update server about local pending crl id list + private StreamObserver<UpdateRequest> requestObserver; + private UUID clientUuid; + private SCMUpdateServiceProtos.ClientId clientIdProto; + + // periodically process pending crls + private ScheduledExecutorService executorService; + private final SCMUpdateServiceGrpcClient serviceGrpcClient; + private long crlCheckInterval; + + CRLClientUpdateHandler(UUID clientId, + SCMUpdateServiceStub updateStub, + SCMUpdateServiceGrpcClient serviceGrpcClient, + long crlCheckInterval) { + this.clientUuid = clientId; + this.updateStub = updateStub; + this.serviceGrpcClient = serviceGrpcClient; + + this.clientStore = serviceGrpcClient.getClientCRLStore(); + this.crlCheckInterval = crlCheckInterval; + LOG.info("Pending CRL check interval : {}s", crlCheckInterval/1000); + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("CRLUpdateHandler Thread - %d").build()); + } + + public static Logger getLog() { + return LOG; + } + + @Override + public void handleServerUpdate(UpdateResponse updateResponse) { + SCMUpdateServiceProtos.CRLInfoProto crlIfo = + updateResponse.getCrlUpdateResponse().getCrlInfo(); + + long receivedCrlId = crlIfo.getCrlSequenceID(); + long localCrlId = clientStore.getLatestCrlId(); + + LOG.debug("## Client: clientId {} clientCrlId {} receivedCrlId {}", + clientUuid, localCrlId, receivedCrlId); + if (localCrlId == receivedCrlId) { + return; + } + // send a client update to refresh stale server + if (localCrlId > receivedCrlId) { + LOG.warn("Receive stale crlId {} lower than client crlId {}", + receivedCrlId, localCrlId); + sendClientUpdate(); + return; + } + + CRLInfo crl; + try { + crl = CRLInfo.fromCRLProto3(crlIfo); + } catch (Exception e) { + LOG.error("Can't parse server CRL update, skip...", e); + return; + } + clientStore.onRevokeCerts(crl); + // send client update. + sendClientUpdate(); + } + + public void start() { + // send initial update request to get a request observer handle + UpdateRequest updateReq = getUpdateRequest(); + requestObserver = updateStub.withWaitForReady() + .updateStatus(new StreamObserver<UpdateResponse>() { + @Override + public void onNext(UpdateResponse updateResponse) { + LOG.debug("Receive server response: {}", updateResponse.toString()); + serviceGrpcClient.incrUpdateCount(); + handleServerUpdate(updateResponse); + } + + @Override + public void onError(Throwable throwable) { + LOG.debug("Receive server error ", throwable); + serviceGrpcClient.incrErrorCount(); + if (serviceGrpcClient.getIsRunning().get()) { + // TODO: not all server error needs client restart. + LOG.warn("Restart client on server error: ", throwable); + serviceGrpcClient.restart(); + } Review comment: Do we need to handle the "else" case here when the serviceGrpcClient is not running? ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class SCMUpdateClientManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateClientManager.class); + private Map<UUID, SCMUpdateClientInfo> clients; + private Map<Type, SCMUpdateHandler> handlers; + + public SCMUpdateClientManager() { + clients = new ConcurrentHashMap<>(); + handlers = new ConcurrentHashMap<>(); + } + + public void registerHandler(SCMUpdateHandler handler) { + handlers.put(handler.getType(), handler); + } + + public void unRegisterHandler(Type type) { + handlers.remove(type); + } + + public UUID addClient() { + UUID clientId = UUID.randomUUID(); + SCMUpdateClientInfo clientInfo = new SCMUpdateClientInfo(clientId); + clients.put(clientId, clientInfo); + return clientId; + } + + // this does not necessary produce a server response via responseObserver. + public void handleClientUpdate(UpdateRequest request, + StreamObserver<UpdateResponse> responseObserver) { + UUID clientId = SCMUpdateClientInfo.fromClientIdProto( + request.getClientId()); + + // Unknown client update + if (!clients.containsKey(clientId)) { + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription("Client must subscribe before send/receive update") + .asException()); + } + + // record the the server to client chanel Review comment: ```suggestion // record the server to client channel ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class SCMUpdateClientManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateClientManager.class); + private Map<UUID, SCMUpdateClientInfo> clients; + private Map<Type, SCMUpdateHandler> handlers; + + public SCMUpdateClientManager() { + clients = new ConcurrentHashMap<>(); + handlers = new ConcurrentHashMap<>(); + } + + public void registerHandler(SCMUpdateHandler handler) { + handlers.put(handler.getType(), handler); + } + + public void unRegisterHandler(Type type) { + handlers.remove(type); + } + + public UUID addClient() { + UUID clientId = UUID.randomUUID(); Review comment: Do we need to check if the random id collides with an already existing clientID? ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateClientManager.java ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.Type; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateRequest; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class SCMUpdateClientManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateClientManager.class); + private Map<UUID, SCMUpdateClientInfo> clients; + private Map<Type, SCMUpdateHandler> handlers; + + public SCMUpdateClientManager() { + clients = new ConcurrentHashMap<>(); + handlers = new ConcurrentHashMap<>(); + } + + public void registerHandler(SCMUpdateHandler handler) { + handlers.put(handler.getType(), handler); + } + + public void unRegisterHandler(Type type) { + handlers.remove(type); + } + + public UUID addClient() { + UUID clientId = UUID.randomUUID(); + SCMUpdateClientInfo clientInfo = new SCMUpdateClientInfo(clientId); + clients.put(clientId, clientInfo); + return clientId; + } + + // this does not necessary produce a server response via responseObserver. Review comment: nit: ```suggestion // this does not necessarily produce a server response via responseObserver. ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMCRLUpdateHandler.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.CRLUpdateResponse; +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos.UpdateResponse; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.security.exception.SCMSecurityException; +import org.apache.hadoop.hdds.security.x509.crl.CRLCodec; +import org.apache.hadoop.hdds.security.x509.crl.CRLInfo; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class handle the CRL client update and response. + */ +public class SCMCRLUpdateHandler implements SCMUpdateHandler { + private static final Logger LOG = + LoggerFactory.getLogger(SCMCRLUpdateHandler.class); + private final CRLStore crlStore; + + private static final SCMUpdateServiceProtos.Type TYPE = + SCMUpdateServiceProtos.Type.CRLUpdate; + + private final Map<UUID, CRLClientInfo> clients; + + SCMCRLUpdateHandler(CRLStore crlStore) { + this.crlStore = crlStore; + clients = new ConcurrentHashMap<>(); + } + + public SCMUpdateServiceProtos.Type getType() { + return TYPE; + } + + @Override + public void handleClientRequest(SCMUpdateServiceProtos.UpdateRequest request, + SCMUpdateClientInfo clientInfo) { + SCMUpdateServiceProtos.CRLUpdateRequest updateStatusRequest = + request.getCrlUpdateRequest(); + long clientCrlId = updateStatusRequest.getReceivedCrlId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Client {} updateStatus \nclientCrlId {} \npendingCrls {}", + clientInfo.getClientId(), clientCrlId, + updateStatusRequest.getPendingCrlIdsList().toString()); + } + + CRLClientInfo crlClientInfo; + if (!clients.containsKey(clientInfo.getClientId())) { + crlClientInfo = new CRLClientInfo(clientInfo); + clients.put(clientInfo.getClientId(), crlClientInfo); + } else { + crlClientInfo = clients.get(clientInfo.getClientId()); + } + + crlClientInfo.setPendingCrlIds( + request.getCrlUpdateRequest().getPendingCrlIdsList()); + crlClientInfo.setReceivedCrlId( + request.getCrlUpdateRequest().getReceivedCrlId()); + + sendCrlUpdateToClient(crlClientInfo); + } + + @Override + public void onUpdate() { + LOG.debug("Update due to certificate revoke"); + // server crl id is usually > client crl id when this is invoked. + clients.values().forEach(client -> { + sendCrlUpdateToClient(client); + }); + } + + @Override + public void onRemoveClient(SCMUpdateClientInfo clientInfo) { + clients.remove(clientInfo.getClientId()); + } + + private void sendCrlUpdateToClient(CRLClientInfo client) { + long clientCrlId = client.getReceivedCrlId(); + long serverCrlId = crlStore.getLatestCrlId(); + + if (clientCrlId >= serverCrlId) { + return; + } + + LOG.debug("## Server: clientCrlId {} serverCrlId {}", + clientCrlId, serverCrlId); + + long nextCrlId = clientCrlId + 1; + try { + CRLInfo crlInfo = null; + while (crlInfo == null && nextCrlId <= serverCrlId) { Review comment: Why do we need the null check here? This can be replaced with if ```suggestion if (nextCrlId <= serverCrlId) { ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig; +import org.apache.ratis.thirdparty.io.grpc.Server; +import org.apache.ratis.thirdparty.io.grpc.ServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * gRPC server for SCM update services. + */ +public class SCMUpdateServiceGrpcServer { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateServiceGrpcServer.class); + + private static final String SERVICE_NAME = "SCMUpdateService"; + private CRLStore crlStore; + private int port; + private Server server; + private SCMUpdateServiceImpl scmUpdateService; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + public SCMUpdateServiceGrpcServer(final UpdateServiceConfig updateConf, + final CRLStore crlStore) { + this.crlStore = crlStore; + this.port = updateConf.getPort(); + } + + public int getPort() { + return this.port; + } + + public void start() throws IOException { + LOG.info("{} starting", SERVICE_NAME); + scmUpdateService = new SCMUpdateServiceImpl(crlStore); + server = ServerBuilder.forPort(port). + addService(scmUpdateService) + .build(); + + if (!isStarted.compareAndSet(false, true)) { + LOG.info("Ignore. already started."); + return; + } else { + server.start(); + } + } + + public void stop() { + LOG.info("{} stopping", SERVICE_NAME); + if (isStarted.get()) { + scmUpdateService = null; + server.shutdown(); + try { + server.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("failed to shutdown SCMClientGrpcServer", e); + } finally { + server.shutdownNow(); + } + LOG.info("{}} stopped!", SERVICE_NAME); Review comment: ```suggestion LOG.info("{} stopped!", SERVICE_NAME); ``` ########## File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/update/server/SCMUpdateServiceGrpcServer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.update.server; + +import org.apache.hadoop.hdds.protocol.scm.proto.SCMUpdateServiceProtos; +import org.apache.hadoop.hdds.scm.update.client.CRLStore; +import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig; +import org.apache.ratis.thirdparty.io.grpc.Server; +import org.apache.ratis.thirdparty.io.grpc.ServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * gRPC server for SCM update services. + */ +public class SCMUpdateServiceGrpcServer { + private static final Logger LOG = + LoggerFactory.getLogger(SCMUpdateServiceGrpcServer.class); + + private static final String SERVICE_NAME = "SCMUpdateService"; + private CRLStore crlStore; + private int port; + private Server server; + private SCMUpdateServiceImpl scmUpdateService; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + public SCMUpdateServiceGrpcServer(final UpdateServiceConfig updateConf, + final CRLStore crlStore) { + this.crlStore = crlStore; + this.port = updateConf.getPort(); + } + + public int getPort() { + return this.port; + } + + public void start() throws IOException { + LOG.info("{} starting", SERVICE_NAME); + scmUpdateService = new SCMUpdateServiceImpl(crlStore); + server = ServerBuilder.forPort(port). + addService(scmUpdateService) + .build(); + + if (!isStarted.compareAndSet(false, true)) { + LOG.info("Ignore. already started."); Review comment: nit: ```suggestion LOG.info("Ignoring start() since {} is already started.", SERVICE_NAME); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
