Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 97c9f7398 -> d8425a8d8
SENTRY-1477: Sentry clients should retry with another server when they get connection errors (Li Li, reviewed by Alexander Kolbasov, Hao Hao) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/d8425a8d Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/d8425a8d Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/d8425a8d Branch: refs/heads/sentry-ha-redesign Commit: d8425a8d89ad7b9a8d9e597322b456909d60cfa8 Parents: 97c9f73 Author: Li Li <[email protected]> Authored: Tue Nov 8 21:11:13 2016 -0800 Committer: Li Li <[email protected]> Committed: Tue Nov 8 21:11:13 2016 -0800 ---------------------------------------------------------------------- .../SentryPolicyServiceClientDefaultImpl.java | 124 +++++++++++++--- .../provider/db/service/thrift/ThriftUtil.java | 15 ++ .../thrift/PoolClientInvocationHandler.java | 87 +++--------- .../thrift/RetryClientInvocationHandler.java | 142 +++++++++++++++++++ .../thrift/SentryClientInvocationHandler.java | 2 +- .../thrift/SentryServiceClientFactory.java | 5 +- .../sentry/service/thrift/ServiceConstants.java | 15 ++ .../thrift/TestPoolClientInvocationHandler.java | 20 ++- .../tests/e2e/hive/TestPolicyImportExport.java | 4 +- 9 files changed, 319 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index 4f42a51..2dc8af8 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -21,17 +21,19 @@ package org.apache.sentry.provider.db.service.thrift; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import javax.security.auth.callback.CallbackHandler; +import com.google.common.net.HostAndPort; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.sentry.core.common.exception.SentryUserException; @@ -73,15 +75,17 @@ import com.google.common.collect.Sets; public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient { private final Configuration conf; - private final InetSocketAddress serverAddress; private final boolean kerberos; - private final String[] serverPrincipalParts; + private String[] serverPrincipalParts; private SentryPolicyService.Client client; private TTransport transport; private int connectionTimeout; private static final Logger LOGGER = LoggerFactory .getLogger(SentryPolicyServiceClient.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred "; + // configs for connection retry + private int connectionFullRetryTotal; + private List<InetSocketAddress> endpoints; /** * This transport wraps the Sasl transports to set up the right UGI context for open(). @@ -131,40 +135,121 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } } - public SentryPolicyServiceClientDefaultImpl(Configuration conf) throws IOException { - this(Preconditions.checkNotNull(conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key " - + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt( - ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT), conf); + /** + * Initialize the sentry configurations. + */ + public SentryPolicyServiceClientDefaultImpl(Configuration conf) + throws IOException { + this.conf = conf; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + this.connectionTimeout = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT, + ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); + this.connectionFullRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL, + ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL_DEFAULT); + this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, + ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); + this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( + conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); + + String hostsAndPortsStr = conf.get(ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS); + if (hostsAndPortsStr == null) { + throw new RuntimeException("Config key " + + ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS + " is required"); + } + int defaultPort = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, + ServiceConstants.ClientConfig.SERVER_RPC_PORT_DEFAULT); + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); + this.endpoints = new ArrayList(hostsAndPortsStrArr.length); + for (int i = hostsAndPortsStrArr.length - 1; i >= 0 ; i--) { + this.endpoints.add( + new InetSocketAddress(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort())); + LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString()); + } } public SentryPolicyServiceClientDefaultImpl(String addr, int port, Configuration conf) throws IOException { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( + InetSocketAddress serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( addr, "Config key " + ClientConfig.SERVER_RPC_ADDRESS + " is required"), port); this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( + this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); + connect(serverAddress); + } + + /** + * This is a no-op when already connected. + * When there is a connection error, it will retry with another sentry server. It will + * first cycle through all the available sentry servers, and then retry the whole server + * list no more than connectionFullRetryTotal times. In this case, it won't introduce + * more latency when some server fails. Also to prevent all clients connecting to the + * same server, it will reorder the endpoints randomly after a full retry. + * <p> + * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server. + * <p> + * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + */ + public synchronized void connectWithRetry() throws IOException { + if (isConnected()) { + return; + } + IOException currentException = null; + // Here for each full connectWithRetry it will cycle through all available sentry + // servers. Before each full connectWithRetry, it will shuffle the server list. + for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { + // Reorder endpoints randomly to prevent all clients connecting to the same endpoint + // at the same time after a node failure. + Collections.shuffle(endpoints); + for (InetSocketAddress addr : endpoints) { + try { + connect(addr); + LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString())); + return; + } catch (IOException e) { + LOGGER.debug(String.format("Failed connection to %s: %s", + addr.toString(), e.getMessage()), e); + currentException = e; + } + } + } + + // Throw exception as reaching the max full connectWithRetry number. + LOGGER.error( + String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), + currentException); + throw currentException; + } + + /** + * Connect to the specified socket address and throw IOException if failed. + */ + private void connect(InetSocketAddress serverAddress) throws IOException { transport = new TSocket(serverAddress.getHostName(), serverAddress.getPort(), connectionTimeout); if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required"); + String serverPrincipal = Preconditions.checkNotNull( + conf.get(ServiceConstants.ServerConfig.PRINCIPAL), + ServiceConstants.ServerConfig.PRINCIPAL + " is required"); // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + serverPrincipal = + SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); LOGGER.debug("Using server kerberos principal: " + serverPrincipal); serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); + "Kerberos principal should have 3 parts: " + serverPrincipal); boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), + .get(ServiceConstants.ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); + transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport( + SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), null, serverPrincipalParts[0], serverPrincipalParts[1], - ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); + ServiceConstants.ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); } else { serverPrincipalParts = null; } @@ -174,7 +259,8 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); } LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); - long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, + long maxMessageSize = conf.getLong( + ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); TMultiplexedProtocol protocol = new TMultiplexedProtocol( new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true), @@ -941,11 +1027,15 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } public synchronized void close() { - if (transport != null) { + if (isConnected()) { transport.close(); } } + private boolean isConnected() { + return transport != null && transport.isOpen(); + } + /** * Import the sentry mapping data, convert the mapping data from map structure to * TSentryMappingData, and call the import API. http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java index 3a96d0b..5fed04a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java @@ -18,6 +18,7 @@ package org.apache.sentry.provider.db.service.thrift; +import com.google.common.net.HostAndPort; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSaslServerTransport; @@ -109,4 +110,18 @@ public final class ThriftUtil { private ThriftUtil() { // Make constructor private to avoid instantiation } + + /** + * Utility function for parsing host and port strings. Expected form should be + * (host:port). The hostname could be in ipv6 style. If port is not specified, + * defaultPort will be used. + */ + public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) { + HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length]; + for (int i = 0; i < hostsAndPorts.length; i++) { + hostsAndPorts[i] = + HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort); + } + return hostsAndPorts; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java index 730bfec..d5f4fcb 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java @@ -17,10 +17,11 @@ */ package org.apache.sentry.service.thrift; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.AbandonedConfig; import org.apache.commons.pool2.impl.GenericObjectPool; @@ -28,6 +29,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; +import org.apache.sentry.provider.db.service.thrift.ThriftUtil; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -138,74 +140,13 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { } int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT); - String[] hostsAndPorts = hostsAndPortsStr.split(","); - String[] hosts = new String[hostsAndPorts.length]; - int[] ports = new int[hostsAndPorts.length]; - parseHostPortStrings(hostsAndPortsStr, hostsAndPorts, hosts, - ports, defaultPort); + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); this.endpoints = new Endpoint[hostsAndPorts.length]; for (int i = 0; i < this.endpoints.length; i++) { - this.endpoints[i] = new Endpoint(hosts[i], ports[i]); - LOGGER.info("Initiate sentry sever endpoint: hostname " + hosts[i] + ", port " + ports[i]); - } - } - - @VisibleForTesting - /** - * Utility function for parsing host and port strings. Expected form should be - * (host:port). The hostname could be in ipv6 style. Port number can be empty - * and will be default to defaultPort. - */ - static protected void parseHostPortStrings(String hostsAndPortsStr, - String[] hostsAndPorts, String[] hosts, int[] ports, - int defaultPort) { - int i = -1; - for (String hostAndPort: hostsAndPorts) { - i++; - hostAndPort = hostAndPort.trim(); - if (hostAndPort.isEmpty()) { - throw new RuntimeException("Cannot handle empty server RPC address " + - "in component " + (i + 1) + " of " + hostsAndPortsStr); - } - int colonIdx = hostAndPort.lastIndexOf(":"); - if (colonIdx == -1) { - // There is no colon in the host+port string. - // That means the port is left unspecified, and should be set to - // the default. - hosts[i] = hostAndPort; - ports[i] = defaultPort; - continue; - } - int rightBracketIdx = hostAndPort.indexOf(']', colonIdx); - if (rightBracketIdx != -1) { - // If there is a right bracket that occurs after the colon, the - // colon we found is part of an ipv6 address like this: - // [::1]. That means we only have the host part, not the port part. - hosts[i] = hostAndPort.substring(0, rightBracketIdx); - ports[i] = defaultPort; - continue; - } - // We have a host:port string, where the part after colon should be - // the port. - hosts[i] = hostAndPort.substring(0, colonIdx); - String portStr = hostAndPort.substring(colonIdx+1); - try { - ports[i] = Integer.valueOf(portStr); - } catch (NumberFormatException e) { - throw new RuntimeException("Cannot parse port string " + portStr + - "in component " + (i + 1) + " of " + hostsAndPortsStr); - } - if ((ports[i] < 0) || (ports[i] > 65535)) { - throw new RuntimeException("Invalid port number given for " + portStr + - "in component " + (i + 1) + " of " + hostsAndPortsStr); - } - } - // Strip the brackets off of hostnames and ip addresses enclosed in square - // brackets. This is to support ipv6-style [address]:port addressing. - for (int j = 0; j < hosts.length; j++) { - if ((hosts[j].startsWith("[")) && (hosts[j].endsWith("]"))) { - hosts[j] = hosts[j].substring(1, hosts[j].length() - 1); - } + this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort()); + LOGGER.info("Initiate sentry sever endpoint: hostname " + + hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort()); } } @@ -298,6 +239,11 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { client = pool.borrowObject(); } catch (Exception e) { LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); + // If the exception is caused by connection problem, throw the TTransportException + // for reconnect. + if (e instanceof IOException) { + throw new TTransportException(e); + } throw new SentryUserException(e.getMessage(), e); } try { @@ -305,13 +251,14 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { result = method.invoke(client, args); } catch (InvocationTargetException e) { // Get the target exception, check if SentryUserException or TTransportException is wrapped. - // TTransportException means there has connection problem with the pool. + // TTransportException or IOException means there has connection problem with the pool. Throwable targetException = e.getCause(); if (targetException instanceof SentryUserException) { Throwable sentryTargetException = targetException.getCause(); // If there has connection problem, eg, invalid connection if the service restarted, - // sentryTargetException instanceof TTransportException = true. - if (sentryTargetException instanceof TTransportException) { + // sentryTargetException instanceof TTransportException or IOException = true. + if (sentryTargetException instanceof TTransportException + || sentryTargetException instanceof IOException) { // If the exception is caused by connection problem, destroy the instance and // remove it from the commons-pool. Throw the TTransportException for reconnect. pool.invalidateObject(client); http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java new file mode 100644 index 0000000..c4964c3 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool + * model. Currently only one client connection is allowed, and it's using lazy connection. + * The client is not connected to the sentry server until there is any rpc call. + * <p> + * For every rpc call, if the client is not connected, it will first connect to a sentry + * server, and then do the thrift call to the connected sentry server, which will execute + * the requested method and return back the response. If it is failed with connection + * problem, it will close the current connection and retry (reconnect and resend the + * thrift call) no more than rpcRetryTotal times. If the client is already connected, it + * will reuse the existing connection, and do the thrift call. + * <p> + * During reconnection, it will first cycle through all the available sentry servers, and + * then retry the whole server list no more than connectionFullRetryTotal times. In this + * case, it won't introduce more latency when some server fails. Also to prevent all + * clients connecting to the same server, it will reorder the endpoints randomly after a + * full retry. + * <p> + * TODO: allow multiple client connections + */ +class RetryClientInvocationHandler extends SentryClientInvocationHandler{ + private static final Logger LOGGER = + LoggerFactory.getLogger(RetryClientInvocationHandler.class); + private final Configuration conf; + private SentryPolicyServiceClientDefaultImpl client = null; + private final int rpcRetryTotal; + + /** + * Initialize the sentry configurations, including rpc retry count and client connection + * configs for SentryPolicyServiceClientDefaultImpl + */ + RetryClientInvocationHandler(Configuration conf) throws IOException { + this.conf = conf; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + this.rpcRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL, + ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL_DEFAULT); + client = new SentryPolicyServiceClientDefaultImpl(conf); + } + + /** + * For every rpc call, if the client is not connected, it will first connect to a sentry + * server, and then do the thrift call to the connected sentry server, which will + * execute the requested method and return back the response. If it is failed with + * connection problem, it will close the current connection, and retry (reconnect and + * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException + * if failed retry after rpcRetryTotal times. + * Synchronized it for thread safety. + */ + @Override + synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { + int retryCount = 0; + Exception lastExc = null; + + while (retryCount < rpcRetryTotal) { + // Connect to a sentry server if not connected yet. + try { + client.connectWithRetry(); + } catch (IOException e) { + // Increase the retry num + // Retry when the exception is caused by connection problem. + retryCount++; + lastExc = e; + close(); + continue; + } + + // do the thrift call + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + // Get the target exception, check if SentryUserException or TTransportException is wrapped. + // TTransportException means there has connection problem with the pool. + Throwable targetException = e.getCause(); + if (targetException instanceof SentryUserException) { + Throwable sentryTargetException = targetException.getCause(); + // If there has connection problem, eg, invalid connection if the service restarted, + // sentryTargetException instanceof TTransportException = true. + if (sentryTargetException instanceof TTransportException) { + // Retry when the exception is caused by connection problem. + lastExc = new TTransportException(sentryTargetException); + LOGGER.debug("Got TTransportException when do the thrift call ", lastExc); + } else { + // The exception is thrown by thrift call, eg, SentryAccessDeniedException. + // Do not need to reconnect to the sentry server. + throw (SentryUserException) targetException; + } + } else { + throw e; + } + } + + // Increase the retry num + retryCount++; + + // For connection problem, it will close the current connection, and reconnect to + // an available sentry server and redo the thrift call. + close(); + } + // Throw the exception as reaching the max rpc retry num. + LOGGER.error(String.format("failed after %d retries ", rpcRetryTotal), lastExc); + throw new SentryUserException( + String.format("failed after %d retries ", rpcRetryTotal), lastExc); + } + + @Override + public void close() { + client.close(); + LOGGER.debug("Close the current client connection"); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java index a41be7f..b8c7f23 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java @@ -44,7 +44,7 @@ public abstract class SentryClientInvocationHandler implements InvocationHandler /** * Subclass should implement this method for special function */ - public abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception; + abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception; /** * An abstract method "close", an invocationHandler should close its contexts at here. http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index b7d2be1..f822497 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -40,7 +40,10 @@ public final class SentryServiceClientFactory { SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), new PoolClientInvocationHandler(conf)); } else { - return new SentryPolicyServiceClientDefaultImpl(conf); + return (SentryPolicyServiceClient) Proxy + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new RetryClientInvocationHandler(conf)); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index a249904..06a9571 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -26,6 +26,7 @@ import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; +import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; public class ServiceConstants { @@ -237,6 +238,20 @@ public class ServiceConstants { public static final String SENTRY_POOL_RETRY_TOTAL = "sentry.service.client.connection.pool.retry-total"; public static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = 3; + /** + * full retry num for getting the connection in non-pool model + * In a full retry, it will cycle through all available sentry servers + * {@link SentryPolicyServiceClientDefaultImpl#connectWithRetry()} + */ + public static final String SENTRY_FULL_RETRY_TOTAL = "sentry.service.client.connection.full.retry-total"; + public static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2; + /** + * max retry num for client rpc + * {@link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} + */ + public static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; + public static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; + // max message size for thrift messages public static final String SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.client.thrift.max.message.size"; public static final long SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java index 5b0e12b..7292387 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java @@ -18,6 +18,8 @@ package org.apache.sentry.service.thrift; +import com.google.common.net.HostAndPort; +import org.apache.sentry.provider.db.service.thrift.ThriftUtil; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -30,12 +32,10 @@ public class TestPoolClientInvocationHandler { private void expectParseHostPortStrings(String hostsAndPortsStr, String[] expectedHosts, int[] expectedPorts) throws Exception { boolean success = false; - String[] hostsAndPorts = hostsAndPortsStr.split(","); - String[] hosts = new String[hostsAndPorts.length]; - int[] ports = new int[hostsAndPorts.length]; + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts; try { - PoolClientInvocationHandler.parseHostPortStrings(hostsAndPortsStr, - hostsAndPorts, hosts, ports, 8038); + hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038); success = true; } finally { if (!success) { @@ -43,12 +43,22 @@ public class TestPoolClientInvocationHandler { hostsAndPortsStr); } } + String[] hosts = new String[hostsAndPortsStrArr.length]; + int[] ports = new int[hostsAndPortsStrArr.length]; + parseHostsAndPorts(hostsAndPorts, hosts, ports); Assert.assertArrayEquals("Got unexpected hosts results while " + "parsing " + hostsAndPortsStr, expectedHosts, hosts); Assert.assertArrayEquals("Got unexpected ports results while " + "parsing " + hostsAndPortsStr, expectedPorts, ports); } + private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports) { + for (int i = 0; i < hostsAndPorts.length; i++) { + hosts[i] = hostsAndPorts[i].getHostText(); + ports[i] = hostsAndPorts[i].getPort(); + } + } + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") @Test public void testParseHostPortStrings() throws Exception { http://git-wip-us.apache.org/repos/asf/sentry/blob/d8425a8d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java index 3f57a00..b048989 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/TestPolicyImportExport.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.FileOutputStream; +import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; import java.util.Set; @@ -172,8 +173,9 @@ public class TestPolicyImportExport extends AbstractTestWithStaticConfiguration try { configTool.importPolicy(); fail("IllegalArgumentException should be thrown for: Invalid key value: server [server]"); - } catch (IllegalArgumentException ex) { + } catch (UndeclaredThrowableException ex) { // ignore + assertTrue(ex.getUndeclaredThrowable().getCause() instanceof IllegalArgumentException); } }
