This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 20b8bca HDDS-4876. [SCM HA Security] Add failover proxy to SCM
Security Server Protocol (#1978)
20b8bca is described below
commit 20b8bca51e4e5b3f2aa04d630411cc7f117443b0
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Tue Mar 9 23:21:28 2021 +0530
HDDS-4876. [SCM HA Security] Add failover proxy to SCM Security Server
Protocol (#1978)
---
.../security/exception/SCMSecurityException.java | 39 ++-
.../x509/certificate/utils/CertificateCodec.java | 3 +-
.../SCMSecurityProtocolClientSideTranslatorPB.java | 31 +++
.../SCMSecurityProtocolFailoverProxyProvider.java | 281 +++++++++++++++++++++
.../certificate/authority/DefaultCAServer.java | 4 +-
.../certificates/utils/CertificateSignRequest.java | 5 +-
.../java/org/apache/hadoop/hdds/utils/HAUtils.java | 23 ++
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 38 +--
.../src/main/proto/ScmServerSecurityProtocol.proto | 13 +
.../SCMSecurityProtocolServerSideTranslatorPB.java | 88 ++++---
.../hdds/scm/server/SCMSecurityProtocolServer.java | 46 +++-
.../hadoop/ozone/TestSecureOzoneCluster.java | 5 +-
12 files changed, 494 insertions(+), 82 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
index bbe25a9..95d6064 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -37,6 +37,16 @@ public class SCMSecurityException extends IOException {
/**
* Ctor.
+ * @param message - Error Message
+ * @param errorCode - Error code
+ */
+ public SCMSecurityException(String message, ErrorCode errorCode) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Ctor.
* @param message - Message.
* @param cause - Actual cause.
*/
@@ -47,11 +57,23 @@ public class SCMSecurityException extends IOException {
/**
* Ctor.
- * @param message - Message.
+ * @param message - Error Message
+ * @param cause - Actual cause.
+ * @param errorCode - Error code.
+ */
+ public SCMSecurityException(String message, Throwable cause,
+ ErrorCode errorCode) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Ctor.
+ * @param cause - Actual cause.
* @param error - error code.
*/
- public SCMSecurityException(String message, ErrorCode error) {
- super(message);
+ public SCMSecurityException(Exception cause, ErrorCode error) {
+ super(cause);
this.errorCode = error;
}
@@ -72,6 +94,17 @@ public class SCMSecurityException extends IOException {
* Error codes to make it easy to decode these exceptions.
*/
public enum ErrorCode {
+ OK,
+ INVALID_CSR,
+ UNABLE_TO_ISSUE_CERTIFICATE,
+ GET_DN_CERTIFICATE_FAILED,
+ GET_OM_CERTIFICATE_FAILED,
+ GET_SCM_CERTIFICATE_FAILED,
+ GET_CERTIFICATE_FAILED,
+ GET_CA_CERT_FAILED,
+ CERTIFICATE_NOT_FOUND,
+ PEM_ENCODE_FAILED,
+ INTERNAL_ERROR,
DEFAULT,
MISSING_BLOCK_TOKEN,
BLOCK_TOKEN_VERIFICATION_FAILED
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
index 1abdcc3..53d8e9a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
@@ -50,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.PEM_ENCODE_FAILED;
/**
* A class used to read and write X.509 certificates PEM encoded Streams.
@@ -125,7 +126,7 @@ public class CertificateCodec {
LOG.error("Error in encoding certificate." + certificate
.getSubjectDN().toString(), e);
throw new SCMSecurityException("PEM Encoding failed for certificate." +
- certificate.getSubjectDN().toString(), e);
+ certificate.getSubjectDN().toString(), e, PEM_ENCODE_FAILED);
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 672b95e..f54d228 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -37,7 +38,10 @@ import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest.Builder;
import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
+import
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -58,12 +62,22 @@ public class SCMSecurityProtocolClientSideTranslatorPB
implements
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final SCMSecurityProtocolPB rpcProxy;
+ private SCMSecurityProtocolFailoverProxyProvider failoverProxyProvider;
public SCMSecurityProtocolClientSideTranslatorPB(
SCMSecurityProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
+ public SCMSecurityProtocolClientSideTranslatorPB(
+ SCMSecurityProtocolFailoverProxyProvider proxyProvider) {
+ Preconditions.checkState(proxyProvider != null);
+ this.failoverProxyProvider = proxyProvider;
+ this.rpcProxy = (SCMSecurityProtocolPB) RetryProxy.create(
+ SCMSecurityProtocolPB.class, failoverProxyProvider,
+ failoverProxyProvider.getRetryPolicy());
+ }
+
/**
* Helper method to wrap the request and send the message.
*/
@@ -80,6 +94,9 @@ public class SCMSecurityProtocolClientSideTranslatorPB
implements
SCMSecurityRequest wrapper = builder.build();
response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+
+ handleError(response);
+
} catch (ServiceException ex) {
throw ProtobufHelper.getRemoteException(ex);
}
@@ -87,6 +104,20 @@ public class SCMSecurityProtocolClientSideTranslatorPB
implements
}
/**
+ * If response is not successful, throw exception.
+ * @param resp - SCMSecurityResponse
+ * @return if response is success, return response, else throw exception.
+ * @throws SCMSecurityException
+ */
+ private SCMSecurityResponse handleError(SCMSecurityResponse resp)
+ throws SCMSecurityException {
+ if (resp.getStatus() != SCMSecurityProtocolProtos.Status.OK) {
+ throw new SCMSecurityException(resp.getMessage(),
+ SCMSecurityException.ErrorCode.values()[resp.getStatus().ordinal()]);
+ }
+ return resp;
+ }
+ /**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
new file mode 100644
index 0000000..a2d2fb3
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+/**
+ * Failover proxy provider for SCMSecurityProtocol server.
+ */
+public class SCMSecurityProtocolFailoverProxyProvider implements
+ FailoverProxyProvider<SCMSecurityProtocolPB>, Closeable {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class);
+
+ // scmNodeId -> ProxyInfo<rpcProxy>
+ private final Map<String,
+ ProxyInfo<SCMSecurityProtocolPB>> scmProxies;
+
+ // scmNodeId -> SCMProxyInfo
+ private final Map<String, SCMProxyInfo> scmProxyInfoMap;
+
+ private List<String> scmNodeIds;
+
+ private String currentProxySCMNodeId;
+ private int currentProxyIndex;
+
+ private final ConfigurationSource conf;
+ private final SCMClientConfig scmClientConfig;
+ private final long scmVersion;
+
+ private String scmServiceId;
+
+ private final int maxRetryCount;
+ private final long retryInterval;
+
+ private final UserGroupInformation ugi;
+
+ /**
+ * Construct fail-over proxy provider for SCMSecurityProtocol Server.
+ * @param conf
+ * @param userGroupInformation
+ */
+ public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf,
+ UserGroupInformation userGroupInformation) {
+ Preconditions.checkNotNull(userGroupInformation);
+ this.ugi = userGroupInformation;
+ this.conf = conf;
+ this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class);
+
+ this.scmProxies = new HashMap<>();
+ this.scmProxyInfoMap = new HashMap<>();
+ loadConfigs();
+
+ this.currentProxyIndex = 0;
+ currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+ scmClientConfig = conf.getObject(SCMClientConfig.class);
+ this.maxRetryCount = scmClientConfig.getRetryCount();
+ this.retryInterval = scmClientConfig.getRetryInterval();
+ }
+
+ protected void loadConfigs() {
+ List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+ scmNodeIds = new ArrayList<>();
+
+ for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+ if (scmNodeInfo.getScmSecurityAddress() == null) {
+ throw new ConfigurationException("SCM Client Address could not " +
+ "be obtained from config. Config is not properly defined");
+ } else {
+ InetSocketAddress scmSecurityAddress =
+ NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
+
+ scmServiceId = scmNodeInfo.getServiceId();
+ String scmNodeId = scmNodeInfo.getNodeId();
+
+ scmNodeIds.add(scmNodeId);
+ SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
+ scmSecurityAddress);
+ scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
+ }
+ }
+ }
+
+ @Override
+ public synchronized ProxyInfo<SCMSecurityProtocolPB> getProxy() {
+ ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+ if (currentProxyInfo == null) {
+ currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
+ }
+ return currentProxyInfo;
+ }
+
+ /**
+ * Creates proxy object.
+ */
+ private ProxyInfo createSCMProxy(String nodeId) {
+ ProxyInfo proxyInfo;
+ SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+ InetSocketAddress address = scmProxyInfo.getAddress();
+ try {
+ SCMSecurityProtocolPB scmProxy = createSCMProxy(address);
+ // Create proxyInfo here, to make it work with all Hadoop versions.
+ proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
+ scmProxies.put(nodeId, proxyInfo);
+ return proxyInfo;
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to SCM at {}",
+ this.getClass().getSimpleName(), address, ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress)
+ throws IOException {
+ Configuration hadoopConf =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+ RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ // FailoverOnNetworkException ensures that the IPC layer does not attempt
+ // retries on the same SCM in case of connection exception. This retry
+ // policy essentially results in TRY_ONCE_THEN_FAIL.
+
+ RetryPolicy connectionRetryPolicy = RetryPolicies
+ .failoverOnNetworkException(0);
+
+ return RPC.getProtocolProxy(SCMSecurityProtocolPB.class,
+ scmVersion, scmAddress, ugi,
+ hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
+ (int)scmClientConfig.getRpcTimeOut(),
connectionRetryPolicy).getProxy();
+ }
+
+
+ @Override
+ public void performFailover(SCMSecurityProtocolPB currentProxy) {
+ if (LOG.isDebugEnabled()) {
+ int currentIndex = getCurrentProxyIndex();
+ LOG.debug("Failing over SCM Security proxy to index: {}, nodeId: {}",
+ currentIndex, scmNodeIds.get(currentIndex));
+ }
+ }
+
+ /**
+ * Performs fail-over to the next proxy.
+ */
+ public void performFailoverToNextProxy() {
+ int newProxyIndex = incrementProxyIndex();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Incrementing SCM Security proxy index to {}, nodeId: {}",
+ newProxyIndex, scmNodeIds.get(newProxyIndex));
+ }
+ }
+
+ /**
+ * Update the proxy index to the next proxy in the list.
+ * @return the new proxy index
+ */
+ private synchronized int incrementProxyIndex() {
+ currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+ currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+ return currentProxyIndex;
+ }
+
+ public RetryPolicy getRetryPolicy() {
+ // Client will attempt up to maxFailovers number of failovers between
+ // available SCMs before throwing exception.
+ RetryPolicy retryPolicy = new RetryPolicy() {
+ @Override
+ public RetryAction shouldRetry(Exception exception, int retries,
+ int failovers, boolean isIdempotentOrAtMostOnce)
+ throws Exception {
+
+ if (LOG.isDebugEnabled()) {
+ if (exception.getCause() != null) {
+ LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
+ getCurrentProxySCMNodeId(),
+ exception.getCause().getClass().getSimpleName(),
+ exception.getCause().getMessage());
+ } else {
+ LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
+ exception.getMessage());
+ }
+ }
+
+ // For AccessControl Exception where Client is not authentica
+ if (HAUtils.isAccessControlException(exception)) {
+ return RetryAction.FAIL;
+ }
+
+ // Perform fail over to next proxy, as right now we don't have any
+ // suggested leader ID from server, we fail over to next one.
+ // TODO: Act based on server response if leader id is passed.
+ performFailoverToNextProxy();
+ return getRetryAction(FAILOVER_AND_RETRY, failovers);
+ }
+
+ private RetryAction getRetryAction(RetryDecision fallbackAction,
+ int failovers) {
+ if (failovers < maxRetryCount) {
+ return new RetryAction(fallbackAction, getRetryInterval());
+ } else {
+ return RetryAction.FAIL;
+ }
+ }
+ };
+
+ return retryPolicy;
+ }
+
+
+ @Override
+ public Class< SCMSecurityProtocolPB > getInterface() {
+ return SCMSecurityProtocolPB.class;
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (Map.Entry<String, ProxyInfo<SCMSecurityProtocolPB>> proxy :
+ scmProxies.entrySet()) {
+ if (proxy.getValue() != null) {
+ RPC.stopProxy(proxy.getValue());
+ }
+ scmProxies.remove(proxy.getKey());
+ }
+ }
+
+ public synchronized String getCurrentProxySCMNodeId() {
+ return currentProxySCMNodeId;
+ }
+
+ public synchronized int getCurrentProxyIndex() {
+ return currentProxyIndex;
+ }
+
+ private long getRetryInterval() {
+ return retryInterval;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 440bd4c..2cd8993 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.Future;
import java.util.function.Consumer;
import static
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getCertificationRequest;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.UNABLE_TO_ISSUE_CERTIFICATE;
import static
org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CSR_ERROR;
/**
@@ -254,7 +255,8 @@ public class DefaultCAServer implements CertificateServer {
}
} catch (CertificateException | IOException | OperatorCreationException e)
{
LOG.error("Unable to issue a certificate.", e);
- xcertHolder.completeExceptionally(new SCMSecurityException(e));
+ xcertHolder.completeExceptionally(
+ new SCMSecurityException(e, UNABLE_TO_ISSUE_CERTIFICATE));
}
return xcertHolder;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
index b26ad2c..b8d2859 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
@@ -59,6 +59,8 @@ import
org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequestBuilder;
import org.bouncycastle.util.io.pem.PemObject;
import org.bouncycastle.util.io.pem.PemReader;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.INVALID_CSR;
+
/**
* A certificate sign request object that wraps operations to build a
* PKCS10CertificationRequest to CertificateServer.
@@ -134,7 +136,8 @@ public final class CertificateSignRequest {
try (PemReader reader = new PemReader(new StringReader(csr))) {
PemObject pemObject = reader.readPemObject();
if(pemObject.getContent() == null) {
- throw new SCMSecurityException("Invalid Certificate signing request");
+ throw new SCMSecurityException("Invalid Certificate signing request",
+ INVALID_CSR);
}
return new PKCS10CertificationRequest(pemObject.getContent());
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 4632b36..36d6bab 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.utils;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -35,6 +36,8 @@ import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
import java.io.File;
import java.io.IOException;
@@ -285,4 +288,24 @@ public final class HAUtils {
}
return metadataDir;
}
+
+ /**
+ * Unwrap exception to check if it is some kind of access control problem.
+ * {@link AccessControlException}
+ */
+ public static boolean isAccessControlException(Exception ex) {
+ if (ex instanceof ServiceException) {
+ Throwable t = ex.getCause();
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ while (t != null) {
+ if (t instanceof AccessControlException) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ }
+ return false;
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 9e6ef22..ddc7e04 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -46,16 +46,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.recon.ReconConfigKeys;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.MetricsException;
@@ -435,20 +432,9 @@ public final class HddsServerUtil {
*/
public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
OzoneConfiguration conf) throws IOException {
- RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
- ProtobufRpcEngine.class);
- long scmVersion =
- RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
- InetSocketAddress address =
- getScmAddressForSecurityProtocol(conf);
- RetryPolicy retryPolicy =
- RetryPolicies.retryForeverWithFixedSleep(
- 1000, TimeUnit.MILLISECONDS);
return new SCMSecurityProtocolClientSideTranslatorPB(
- RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
- address, UserGroupInformation.getCurrentUser(),
- conf, NetUtils.getDefaultSocketFactory(conf),
- Client.getRpcTimeout(conf), retryPolicy).getProxy());
+ new SCMSecurityProtocolFailoverProxyProvider(conf,
+ UserGroupInformation.getCurrentUser()));
}
@@ -489,17 +475,11 @@ public final class HddsServerUtil {
*/
public static SCMSecurityProtocol getScmSecurityClient(
OzoneConfiguration conf, UserGroupInformation ugi) throws IOException {
- RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
- ProtobufRpcEngine.class);
- long scmVersion =
- RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
- InetSocketAddress scmSecurityProtoAdd =
- getScmAddressForSecurityProtocol(conf);
- return new SCMSecurityProtocolClientSideTranslatorPB(
- RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
- scmSecurityProtoAdd, ugi, conf,
- NetUtils.getDefaultSocketFactory(conf),
- Client.getRpcTimeout(conf)));
+ SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
+ new SCMSecurityProtocolClientSideTranslatorPB(
+ new SCMSecurityProtocolFailoverProxyProvider(conf, ugi));
+ return TracingUtil.createProxy(scmSecurityClient,
+ SCMSecurityProtocol.class, conf);
}
/**
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 0455952..48c6cf9 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -83,6 +83,19 @@ enum Type {
enum Status {
OK = 1;
+ INVALID_CSR = 2;
+ UNABLE_TO_ISSUE_CERTIFICATE = 3;
+ GET_DN_CERTIFICATE_FAILED = 4;
+ GET_OM_CERTIFICATE_FAILED = 5;
+ GET_SCM_CERTIFICATE_FAILED = 6;
+ GET_CERTIFICATE_FAILED = 7;
+ GET_CA_CERT_FAILED = 8;
+ CERTIFICATE_NOT_FOUND = 9;
+ PEM_ENCODE_FAILED = 10;
+ INTERNAL_ERROR = 11;
+ DEFAULT = 12;
+ MISSING_BLOCK_TOKEN = 13;
+ BLOCK_TOKEN_VERIFICATION_FAILED = 14;
}
/**
* This message is send by data node to prove its identity and get an SCM
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index babc87b..06da6e4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -33,6 +33,8 @@ import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
@@ -54,14 +56,17 @@ public class SCMSecurityProtocolServerSideTranslatorPB
LoggerFactory.getLogger(SCMSecurityProtocolServerSideTranslatorPB.class);
private final SCMSecurityProtocol impl;
+ private final StorageContainerManager scm;
private OzoneProtocolMessageDispatcher<SCMSecurityRequest,
SCMSecurityResponse, ProtocolMessageEnum>
dispatcher;
public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl,
+ StorageContainerManager storageContainerManager,
ProtocolMessageMetrics messageMetrics) {
this.impl = impl;
+ this.scm = storageContainerManager;
this.dispatcher =
new OzoneProtocolMessageDispatcher<>("ScmSecurityProtocol",
messageMetrics, LOG);
@@ -70,62 +75,73 @@ public class SCMSecurityProtocolServerSideTranslatorPB
@Override
public SCMSecurityResponse submitRequest(RpcController controller,
SCMSecurityRequest request) throws ServiceException {
+ if (!scm.checkLeader()) {
+ throw new ServiceException(scm.getScmHAManager()
+ .getRatisServer()
+ .triggerNotLeaderException());
+ }
return dispatcher.processRequest(request, this::processRequest,
request.getCmdType(), request.getTraceID());
}
- public SCMSecurityResponse processRequest(SCMSecurityRequest request)
- throws ServiceException {
+ public SCMSecurityResponse processRequest(SCMSecurityRequest request) {
+ SCMSecurityResponse.Builder scmSecurityResponse =
+ SCMSecurityResponse.newBuilder().setCmdType(request.getCmdType())
+ .setStatus(Status.OK);
try {
switch (request.getCmdType()) {
case GetCertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setGetCertResponseProto(
- getCertificate(request.getGetCertificateRequest()))
- .build();
+ return scmSecurityResponse.setGetCertResponseProto(
+ getCertificate(request.getGetCertificateRequest())).build();
case GetCACertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setGetCertResponseProto(
- getCACertificate(request.getGetCACertificateRequest()))
- .build();
+ return scmSecurityResponse.setGetCertResponseProto(
+ getCACertificate(request.getGetCACertificateRequest())).build();
case GetOMCertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setGetCertResponseProto(
- getOMCertificate(request.getGetOMCertRequest()))
+ return scmSecurityResponse.setGetCertResponseProto(
+ getOMCertificate(request.getGetOMCertRequest()))
.build();
case GetDataNodeCertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setGetCertResponseProto(
- getDataNodeCertificate(request.getGetDataNodeCertRequest()))
+ return scmSecurityResponse.setGetCertResponseProto(
+ getDataNodeCertificate(request.getGetDataNodeCertRequest()))
.build();
case ListCertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setListCertificateResponseProto(
- listCertificate(request.getListCertificateRequest()))
+ return scmSecurityResponse.setListCertificateResponseProto(
+ listCertificate(request.getListCertificateRequest()))
.build();
case GetSCMCertificate:
- return SCMSecurityResponse.newBuilder()
- .setCmdType(request.getCmdType())
- .setStatus(Status.OK)
- .setGetCertResponseProto(getSCMCertificate(
- request.getGetSCMCertificateRequest()))
- .build();
+ return scmSecurityResponse.setGetCertResponseProto(getSCMCertificate(
+ request.getGetSCMCertificateRequest())).build();
default:
throw new IllegalArgumentException(
"Unknown request type: " + request.getCmdType());
}
} catch (IOException e) {
- throw new ServiceException(e);
+ scmSecurityResponse.setSuccess(false);
+ scmSecurityResponse.setStatus(exceptionToResponseStatus(e));
+ // If actual cause is set in SCMSecurityException, set message with
+ // actual cause message.
+ if (e.getMessage() != null) {
+ scmSecurityResponse.setMessage(e.getMessage());
+ } else {
+ if (e.getCause() != null && e.getCause().getMessage() != null) {
+ scmSecurityResponse.setMessage(e.getCause().getMessage());
+ }
+ }
+ return scmSecurityResponse.build();
+ }
+ }
+
+ /**
+ * Convert exception to corresponsing status.
+ * @param ex
+ * @return SCMSecurityProtocolProtos.Status code of the error.
+ */
+ private Status exceptionToResponseStatus(IOException ex) {
+ if (ex instanceof SCMSecurityException) {
+ return Status.values()[
+ ((SCMSecurityException) ex).getErrorCode().ordinal()];
+ } else {
+ return Status.INTERNAL_ERROR;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 3f3b360..5df3aa7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -54,6 +54,9 @@ import org.bouncycastle.cert.X509CertificateHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
+import static
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
import static
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
/**
@@ -91,7 +94,8 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
BlockingService secureProtoPbService =
SCMSecurityProtocolProtos.SCMSecurityProtocolService
.newReflectiveBlockingService(
- new SCMSecurityProtocolServerSideTranslatorPB(this, metrics));
+ new SCMSecurityProtocolServerSideTranslatorPB(this,
+ scm, metrics));
this.rpcServer =
StorageContainerManager.startRpcServer(
conf,
@@ -181,14 +185,34 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
return CertificateCodec.getPEMEncodedString(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("generate" + nodeType.toString() + "Certificate " +
- "operation failed. ", e);
+ throw generateException(e, nodeType);
} catch (ExecutionException e) {
- throw new IOException("generate" + nodeType.toString() + "Certificate " +
- "operation failed.", e);
+ if (e.getCause() != null) {
+ if (e.getCause() instanceof SCMSecurityException) {
+ throw (SCMSecurityException) e.getCause();
+ } else {
+ throw generateException(e, nodeType);
+ }
+ } else {
+ throw generateException(e, nodeType);
+ }
}
}
+ private SCMSecurityException generateException(Exception ex, NodeType role) {
+ SCMSecurityException.ErrorCode errorCode;
+ if (role == NodeType.SCM) {
+ errorCode = SCMSecurityException.ErrorCode.GET_SCM_CERTIFICATE_FAILED;
+ } else if (role == NodeType.OM) {
+ errorCode = SCMSecurityException.ErrorCode.GET_OM_CERTIFICATE_FAILED;
+ } else {
+ errorCode = SCMSecurityException.ErrorCode.GET_DN_CERTIFICATE_FAILED;
+ }
+ return new SCMSecurityException("generate " + role.toString() +
+ " Certificate operation failed", ex, errorCode);
+
+ }
+
/**
* Get SCM signed certificate with given serial id.
*
@@ -206,10 +230,12 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
return CertificateCodec.getPEMEncodedString(certificate);
}
} catch (CertificateException e) {
- throw new IOException("getCertificate operation failed. ", e);
+ throw new SCMSecurityException("getCertificate operation failed. ", e,
+ GET_CERTIFICATE_FAILED);
}
LOGGER.debug("Certificate with serial id {} not found.", certSerialId);
- throw new IOException("Certificate not found");
+ throw new SCMSecurityException("Certificate not found",
+ CERTIFICATE_NOT_FOUND);
}
/**
@@ -224,7 +250,8 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
return CertificateCodec.getPEMEncodedString(
certificateServer.getCACertificate());
} catch (CertificateException e) {
- throw new IOException("getRootCertificate operation failed. ", e);
+ throw new SCMSecurityException("getRootCertificate operation failed. ",
+ e, GET_CA_CERT_FAILED);
}
}
@@ -249,7 +276,8 @@ public class SCMSecurityProtocolServer implements
SCMSecurityProtocol {
String certStr = CertificateCodec.getPEMEncodedString(cert);
results.add(certStr);
} catch (SCMSecurityException e) {
- throw new IOException("listCertificate operation failed. ", e);
+ throw new SCMSecurityException("listCertificate operation failed.",
+ e, e.getErrorCode());
}
}
return results;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 65c70b8..f0adadd 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
@@ -302,7 +302,8 @@ public final class TestSecureOzoneCluster {
assertNotNull(scmSecurityProtocolClient);
String caCert = scmSecurityProtocolClient.getCACertificate();
assertNotNull(caCert);
- LambdaTestUtils.intercept(RemoteException.class, "Certificate not found",
+ LambdaTestUtils.intercept(SCMSecurityException.class,
+ "Certificate not found",
() -> scmSecurityProtocolClient.getCertificate("1"));
// Case 2: User without Kerberos credentials should fail.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]