Repository: sentry Updated Branches: refs/heads/akolb-SENTRY-1593 [created] d22df6947
http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java index 980d930..b855cdf 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java @@ -15,20 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.sentry.provider.db.generic.service.thrift; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.RetryClientInvocationHandler; +import org.apache.sentry.service.thrift.ServiceConstants; + +import java.lang.reflect.Proxy; -/** - * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client. - */ public final class SentryGenericServiceClientFactory { - private SentryGenericServiceClientFactory() { - } + private SentryGenericServiceClientFactory() { + } + + public static SentryGenericServiceClient create(Configuration conf) throws Exception { + boolean pooled = conf.getBoolean( + ServiceConstants.ClientConfig.SENTRY_POOL_ENABLED, ServiceConstants.ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); + if (pooled) { + //SentryGenericServiceClient doesn't have pool implementation + // TODO Implement pool for SentryGenericServiceClient + return null; + } else { + RetryClientInvocationHandler clientHandler = new RetryClientInvocationHandler(conf, + new SentryGenericServiceClientDefaultImpl(conf)); + return (SentryGenericServiceClient) Proxy + .newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(), + SentryGenericServiceClientDefaultImpl.class.getInterfaces(), + clientHandler); - public static SentryGenericServiceClient create(Configuration conf) throws Exception { - return new SentryGenericServiceClientDefaultImpl(conf); - } - + } + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java index f6bb8a5..8298cfa 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java @@ -46,7 +46,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleResponse; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.Status; import org.apache.sentry.service.thrift.TSentryResponseStatus; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java index 1e72b74..b43136b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.sentry.core.common.SentryServiceClient; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -public interface SentryPolicyServiceClient { + +public interface SentryPolicyServiceClient extends SentryServiceClient { void createRole(String requestorUserName, String roleName) throws SentryUserException; @@ -195,7 +197,7 @@ public interface SentryPolicyServiceClient { */ String getConfigValue(String propertyName, String defaultValue) throws SentryUserException; - void close(); + //void close(); // Import the sentry mapping data with map structure void importPolicy(Map<String, Map<String, Set<String>>> policyFileMappingData, http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index 2dc8af8..cb37b57 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -20,22 +20,14 @@ package org.apache.sentry.provider.db.service.thrift; import java.io.IOException; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import javax.security.auth.callback.CallbackHandler; - -import com.google.common.net.HostAndPort; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.SentryServiceClientTransportDefaultImpl; +import org.apache.sentry.core.common.ServiceTransportConstants; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; @@ -44,23 +36,16 @@ import org.apache.sentry.core.model.db.DBModelAuthorizable; import org.apache.sentry.core.common.utils.PolicyFileConstants; import org.apache.sentry.service.thrift.SentryServiceUtil; import org.apache.sentry.service.thrift.ServiceConstants; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants; import org.apache.sentry.service.thrift.Status; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -72,193 +57,33 @@ import com.google.common.collect.Sets; Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state. So it is important to recreate the client, which uses a new socket. */ -public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient { +public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient { - private final Configuration conf; - private final boolean kerberos; - private String[] serverPrincipalParts; private SentryPolicyService.Client client; - private TTransport transport; - private int connectionTimeout; private static final Logger LOGGER = LoggerFactory .getLogger(SentryPolicyServiceClient.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred "; - // configs for connection retry - private int connectionFullRetryTotal; - private List<InetSocketAddress> endpoints; - - /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). - */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - protected UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String authorizationId, - String protocol, String serverName, Map<String, String> props, - CallbackHandler cbh, TTransport transport, boolean wrapUgi) - throws IOException { - super(mechanism, authorizationId, protocol, serverName, props, cbh, - transport); - if (wrapUgi) { - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public synchronized void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport", e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport", e); - } - } - } - private void baseOpen() throws TTransportException { - super.open(); - } - } /** * Initialize the sentry configurations. */ public SentryPolicyServiceClientDefaultImpl(Configuration conf) throws IOException { - this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.connectionTimeout = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.connectionFullRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL, - ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL_DEFAULT); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); - - String hostsAndPortsStr = conf.get(ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS); - if (hostsAndPortsStr == null) { - throw new RuntimeException("Config key " + - ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS + " is required"); - } - int defaultPort = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, - ServiceConstants.ClientConfig.SERVER_RPC_PORT_DEFAULT); - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); - this.endpoints = new ArrayList(hostsAndPortsStrArr.length); - for (int i = hostsAndPortsStrArr.length - 1; i >= 0 ; i--) { - this.endpoints.add( - new InetSocketAddress(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort())); - LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString()); - } + super(conf, ServiceTransportConstants.sentryService.DB_POLICY_SERVICE); } public SentryPolicyServiceClientDefaultImpl(String addr, int port, Configuration conf) throws IOException { - this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - InetSocketAddress serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - addr, "Config key " + ClientConfig.SERVER_RPC_ADDRESS - + " is required"), port); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); + super(addr, port, conf, ServiceTransportConstants.sentryService.DB_POLICY_SERVICE); connect(serverAddress); } /** - * This is a no-op when already connected. - * When there is a connection error, it will retry with another sentry server. It will - * first cycle through all the available sentry servers, and then retry the whole server - * list no more than connectionFullRetryTotal times. In this case, it won't introduce - * more latency when some server fails. Also to prevent all clients connecting to the - * same server, it will reorder the endpoints randomly after a full retry. - * <p> - * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server. - * <p> - * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. - */ - public synchronized void connectWithRetry() throws IOException { - if (isConnected()) { - return; - } - IOException currentException = null; - // Here for each full connectWithRetry it will cycle through all available sentry - // servers. Before each full connectWithRetry, it will shuffle the server list. - for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { - // Reorder endpoints randomly to prevent all clients connecting to the same endpoint - // at the same time after a node failure. - Collections.shuffle(endpoints); - for (InetSocketAddress addr : endpoints) { - try { - connect(addr); - LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString())); - return; - } catch (IOException e) { - LOGGER.debug(String.format("Failed connection to %s: %s", - addr.toString(), e.getMessage()), e); - currentException = e; - } - } - } - - // Throw exception as reaching the max full connectWithRetry number. - LOGGER.error( - String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), - currentException); - throw currentException; - } - - /** * Connect to the specified socket address and throw IOException if failed. */ - private void connect(InetSocketAddress serverAddress) throws IOException { - transport = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull( - conf.get(ServiceConstants.ServerConfig.PRINCIPAL), - ServiceConstants.ServerConfig.PRINCIPAL + " is required"); - - // Resolve server host in the same way as we are doing on server side - serverPrincipal = - SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ServiceConstants.ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport( - SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), - null, serverPrincipalParts[0], serverPrincipalParts[1], - ServiceConstants.ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); - } else { - serverPrincipalParts = null; - } - try { - transport.open(); - } catch (TTransportException e) { - throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); - } - LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); + protected void connect(InetSocketAddress serverAddress) throws IOException { + super.connect(serverAddress); long maxMessageSize = conf.getLong( ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); @@ -1025,17 +850,13 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e); } } - +/* public synchronized void close() { if (isConnected()) { transport.close(); } } - - private boolean isConnected() { - return transport != null && transport.isOpen(); - } - +*/ /** * Import the sentry mapping data, convert the mapping data from map structure to * TSentryMappingData, and call the import API. http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java index a5f11a9..88581f7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java @@ -18,6 +18,7 @@ package org.apache.sentry.provider.db.service.thrift; +import org.apache.sentry.ThriftUtil; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java deleted file mode 100644 index 5fed04a..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.thrift; - -import com.google.common.net.HostAndPort; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSaslServerTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public final class ThriftUtil { - - private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class); - - public static void setImpersonator(final TProtocol in) { - try { - TTransport transport = in.getTransport(); - if (transport instanceof TSaslServerTransport) { - String impersonator = ((TSaslServerTransport) transport).getSaslServer() - .getAuthorizationID(); - setImpersonator(impersonator); - } - } catch (Exception e) { - // If there has exception when get impersonator info, log the error information. - LOGGER.warn("There is an error when get the impersonator:" + e.getMessage()); - } - } - - public static void setIpAddress(final TProtocol in) { - try { - TTransport transport = in.getTransport(); - TSocket tSocket = getUnderlyingSocketFromTransport(transport); - if (tSocket != null) { - setIpAddress(tSocket.getSocket().getInetAddress().toString()); - } else { - LOGGER.warn("Unknown Transport, cannot determine ipAddress"); - } - } catch (Exception e) { - // If there has exception when get impersonator info, log the error information. - LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage()); - } - } - - /** - * Returns the underlying TSocket from the transport, or null of the transport type is unknown. - */ - private static TSocket getUnderlyingSocketFromTransport(TTransport transport) { - Preconditions.checkNotNull(transport); - if (transport instanceof TSaslServerTransport) { - return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport(); - } else if (transport instanceof TSaslClientTransport) { - return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport(); - } else if (transport instanceof TSocket) { - return (TSocket) transport; - } - return null; - } - - private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() { - @Override - protected synchronized String initialValue() { - return ""; - } - }; - - public static void setIpAddress(String ipAddress) { - threadLocalIpAddress.set(ipAddress); - } - - public static String getIpAddress() { - return threadLocalIpAddress.get(); - } - - private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() { - @Override - protected synchronized String initialValue() { - return ""; - } - }; - - public static void setImpersonator(String impersonator) { - threadLocalImpersonator.set(impersonator); - } - - public static String getImpersonator() { - return threadLocalImpersonator.get(); - } - - private ThriftUtil() { - // Make constructor private to avoid instantiation - } - - /** - * Utility function for parsing host and port strings. Expected form should be - * (host:port). The hostname could be in ipv6 style. If port is not specified, - * defaultPort will be used. - */ - public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) { - HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length]; - for (int i = 0; i < hostsAndPorts.length; i++) { - hostsAndPorts[i] = - HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort); - } - return hostsAndPorts; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java index d5f4fcb..ef978ff 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java @@ -29,8 +29,9 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.apache.sentry.SentryClientInvocationHandler; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java deleted file mode 100644 index c4964c3..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryUserException; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** - * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool - * model. Currently only one client connection is allowed, and it's using lazy connection. - * The client is not connected to the sentry server until there is any rpc call. - * <p> - * For every rpc call, if the client is not connected, it will first connect to a sentry - * server, and then do the thrift call to the connected sentry server, which will execute - * the requested method and return back the response. If it is failed with connection - * problem, it will close the current connection and retry (reconnect and resend the - * thrift call) no more than rpcRetryTotal times. If the client is already connected, it - * will reuse the existing connection, and do the thrift call. - * <p> - * During reconnection, it will first cycle through all the available sentry servers, and - * then retry the whole server list no more than connectionFullRetryTotal times. In this - * case, it won't introduce more latency when some server fails. Also to prevent all - * clients connecting to the same server, it will reorder the endpoints randomly after a - * full retry. - * <p> - * TODO: allow multiple client connections - */ -class RetryClientInvocationHandler extends SentryClientInvocationHandler{ - private static final Logger LOGGER = - LoggerFactory.getLogger(RetryClientInvocationHandler.class); - private final Configuration conf; - private SentryPolicyServiceClientDefaultImpl client = null; - private final int rpcRetryTotal; - - /** - * Initialize the sentry configurations, including rpc retry count and client connection - * configs for SentryPolicyServiceClientDefaultImpl - */ - RetryClientInvocationHandler(Configuration conf) throws IOException { - this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.rpcRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL, - ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL_DEFAULT); - client = new SentryPolicyServiceClientDefaultImpl(conf); - } - - /** - * For every rpc call, if the client is not connected, it will first connect to a sentry - * server, and then do the thrift call to the connected sentry server, which will - * execute the requested method and return back the response. If it is failed with - * connection problem, it will close the current connection, and retry (reconnect and - * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException - * if failed retry after rpcRetryTotal times. - * Synchronized it for thread safety. - */ - @Override - synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { - int retryCount = 0; - Exception lastExc = null; - - while (retryCount < rpcRetryTotal) { - // Connect to a sentry server if not connected yet. - try { - client.connectWithRetry(); - } catch (IOException e) { - // Increase the retry num - // Retry when the exception is caused by connection problem. - retryCount++; - lastExc = e; - close(); - continue; - } - - // do the thrift call - try { - return method.invoke(client, args); - } catch (InvocationTargetException e) { - // Get the target exception, check if SentryUserException or TTransportException is wrapped. - // TTransportException means there has connection problem with the pool. - Throwable targetException = e.getCause(); - if (targetException instanceof SentryUserException) { - Throwable sentryTargetException = targetException.getCause(); - // If there has connection problem, eg, invalid connection if the service restarted, - // sentryTargetException instanceof TTransportException = true. - if (sentryTargetException instanceof TTransportException) { - // Retry when the exception is caused by connection problem. - lastExc = new TTransportException(sentryTargetException); - LOGGER.debug("Got TTransportException when do the thrift call ", lastExc); - } else { - // The exception is thrown by thrift call, eg, SentryAccessDeniedException. - // Do not need to reconnect to the sentry server. - throw (SentryUserException) targetException; - } - } else { - throw e; - } - } - - // Increase the retry num - retryCount++; - - // For connection problem, it will close the current connection, and reconnect to - // an available sentry server and redo the thrift call. - close(); - } - // Throw the exception as reaching the max rpc retry num. - LOGGER.error(String.format("failed after %d retries ", rpcRetryTotal), lastExc); - throw new SentryUserException( - String.format("failed after %d retries ", rpcRetryTotal), lastExc); - } - - @Override - public void close() { - client.close(); - LOGGER.debug("Close the current client connection"); - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java deleted file mode 100644 index b8c7f23..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; - -/** - * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY - */ -public abstract class SentryClientInvocationHandler implements InvocationHandler { - - /** - * Close the InvocationHandler: An InvocationHandler may create some contexts, - * these contexts should be close when the method "close()" of client be called. - */ - @Override - public final Object invoke(Object proxy, Method method, Object[] args) throws Exception { - // close() doesn't throw exception we supress that in case of connection - // loss. Changing SentryPolicyServiceClient#close() to throw an - // exception would be a backward incompatible change for Sentry clients. - if ("close".equals(method.getName()) && null == args) { - close(); - return null; - } - return invokeImpl(proxy, method, args); - } - - /** - * Subclass should implement this method for special function - */ - abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception; - - /** - * An abstract method "close", an invocationHandler should close its contexts at here. - */ - public abstract void close(); - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index f822497..3b56d52 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.apache.sentry.core.common.RetryClientInvocationHandler; public final class SentryServiceClientFactory { @@ -33,17 +34,19 @@ public final class SentryServiceClientFactory { public static SentryPolicyServiceClient create(Configuration conf) throws Exception { boolean pooled = conf.getBoolean( - ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); + ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); if (pooled) { return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new PoolClientInvocationHandler(conf)); + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new PoolClientInvocationHandler(conf)); } else { + RetryClientInvocationHandler clientHandler = new RetryClientInvocationHandler(conf, + new SentryPolicyServiceClientDefaultImpl(conf)); return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf)); + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + clientHandler); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 806d03e..4bae4af 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -25,7 +25,6 @@ import javax.security.sasl.Sasl; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import org.apache.sentry.provider.db.service.thrift.SentryMetrics; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; public class ServiceConstants { @@ -253,20 +252,6 @@ public class ServiceConstants { public static final String SENTRY_POOL_RETRY_TOTAL = "sentry.service.client.connection.pool.retry-total"; public static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = 3; - /** - * full retry num for getting the connection in non-pool model - * In a full retry, it will cycle through all available sentry servers - * {@link SentryPolicyServiceClientDefaultImpl#connectWithRetry()} - */ - public static final String SENTRY_FULL_RETRY_TOTAL = "sentry.service.client.connection.full.retry-total"; - public static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2; - /** - * max retry num for client rpc - * {@link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} - */ - public static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; - public static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; - // max message size for thrift messages public static final String SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.client.thrift.max.message.size"; public static final long SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java index 1ec8840..afb095d 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java @@ -40,7 +40,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleResponse; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.Status; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java index dfae5ab..4d0fe7e 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java @@ -43,7 +43,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.TDropSentryRoleReque import org.apache.sentry.provider.db.generic.service.thrift.TDropSentryRoleResponse; import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; import org.apache.sentry.provider.db.log.util.Constants; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.Status; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java index 7292387..e9c7b7e 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java @@ -19,7 +19,7 @@ package org.apache.sentry.service.thrift; import com.google.common.net.HostAndPort; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger;
