SENTRY-1593
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/53003443 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/53003443 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/53003443 Branch: refs/heads/akolb-SENTRY-1593 Commit: 5300344368dcddfd462fc947f179a78369126f53 Parents: 3f99bbd Author: Alexander Kolbasov <[email protected]> Authored: Mon Feb 20 00:14:27 2017 -0600 Committer: Alexander Kolbasov <[email protected]> Committed: Mon Feb 20 00:14:27 2017 -0600 ---------------------------------------------------------------------- sentry-core/sentry-core-common/pom.xml | 8 + .../sentry/SentryClientInvocationHandler.java | 54 ++++ .../main/java/org/apache/sentry/SentryMain.java | 2 +- ...SentryServiceClientTransportDefaultImpl.java | 297 +++++++++++++++++++ .../main/java/org/apache/sentry/ThriftUtil.java | 127 ++++++++ .../common/HdfsServiceTransportConstants.java | 106 +++++++ .../common/PolicyServiceTransportConstants.java | 106 +++++++ .../common/RetryClientInvocationHandler.java | 151 ++++++++++ .../sentry/core/common/SentryServiceClient.java | 42 +++ .../core/common/ServiceTransportConstants.java | 99 +++++++ .../exception/SentryHdfsServiceException.java | 33 +++ .../apache/sentry/hdfs/ServiceConstants.java | 12 +- .../sentry/hdfs/SentryHDFSServiceClient.java | 6 +- .../SentryHDFSServiceClientDefaultImpl.java | 239 +++++---------- .../hdfs/SentryHDFSServiceClientFactory.java | 22 +- .../hdfs/SentryHDFSServiceProcessorFactory.java | 2 +- .../sentry/hdfs/SentryHdfsServiceException.java | 33 --- .../hdfs/SentryHdfsServiceIntegrationBase.java | 14 +- .../SentryGenericPolicyProcessorWrapper.java | 2 +- .../thrift/SentryGenericServiceClient.java | 4 +- .../SentryGenericServiceClientDefaultImpl.java | 139 ++------- .../SentryGenericServiceClientFactory.java | 33 ++- .../db/log/entity/JsonLogEntityFactory.java | 2 +- .../thrift/SentryPolicyServiceClient.java | 6 +- .../SentryPolicyServiceClientDefaultImpl.java | 197 +----------- .../service/thrift/SentryProcessorWrapper.java | 1 + .../provider/db/service/thrift/ThriftUtil.java | 127 -------- .../thrift/PoolClientInvocationHandler.java | 3 +- .../thrift/RetryClientInvocationHandler.java | 142 --------- .../thrift/SentryClientInvocationHandler.java | 54 ---- .../thrift/SentryServiceClientFactory.java | 17 +- .../sentry/service/thrift/ServiceConstants.java | 15 - .../db/log/entity/TestJsonLogEntityFactory.java | 2 +- .../log/entity/TestJsonLogEntityFactoryGM.java | 2 +- .../thrift/TestPoolClientInvocationHandler.java | 2 +- 35 files changed, 1198 insertions(+), 903 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml index 9d18063..538613e 100644 --- a/sentry-core/sentry-core-common/pom.xml +++ b/sentry-core/sentry-core-common/pom.xml @@ -62,6 +62,14 @@ limitations under the License. <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryClientInvocationHandler.java new file mode 100644 index 0000000..6ef5c35 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryClientInvocationHandler.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; + +/** + * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY + */ +public abstract class SentryClientInvocationHandler implements InvocationHandler { + + /** + * Close the InvocationHandler: An InvocationHandler may create some contexts, + * these contexts should be close when the method "close()" of client be called. + */ + @Override + public final Object invoke(Object proxy, Method method, Object[] args) throws Exception { + // close() doesn't throw exception we supress that in case of connection + // loss. Changing SentryPolicyServiceClient#close() to throw an + // exception would be a backward incompatible change for Sentry clients. + if ("close".equals(method.getName()) && null == args) { + close(); + return null; + } + return invokeImpl(proxy, method, args); + } + + /** + * Subclass should implement this method for special function + */ + abstract public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception; + + /** + * An abstract method "close", an invocationHandler should close its contexts at here. + */ + public abstract void close(); + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryMain.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryMain.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryMain.java index d321531..55ef665 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryMain.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryMain.java @@ -30,7 +30,7 @@ import com.google.common.collect.ImmutableMap; import java.io.FileInputStream; import java.util.Properties; -public class SentryMain { +public class SentryMain { private static final String HELP_SHORT = "h"; private static final String HELP_LONG = "help"; private static final String VERSION_SHORT = "v"; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryServiceClientTransportDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryServiceClientTransportDefaultImpl.java new file mode 100644 index 0000000..b4c3a84 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/SentryServiceClientTransportDefaultImpl.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.HdfsServiceTransportConstants; +import org.apache.sentry.core.common.PolicyServiceTransportConstants; +import org.apache.sentry.core.common.ServiceTransportConstants; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.SaslException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +/** + * This class has the transport implementation for sentry clients. + * All the sentry clients should extend this class for transport implementation. + */ + +public abstract class SentryServiceClientTransportDefaultImpl { + protected final Configuration conf; + private final boolean kerberos; + private String[] serverPrincipalParts; + + protected TTransport transport; + private final int connectionTimeout; + private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class); + // configs for connection retry + private final int connectionFullRetryTotal; + private final int rpcRetryTotal; + private final List<InetSocketAddress> endpoints; + protected InetSocketAddress serverAddress; + private final ServiceTransportConstants serviceConstants; + + /** + * This transport wraps the Sasl transports to set up the right UGI context for open(). + */ + public static class UgiSaslClientTransport extends TSaslClientTransport { + protected UserGroupInformation ugi = null; + + public UgiSaslClientTransport(String mechanism, String authorizationId, + String protocol, String serverName, Map<String, String> props, + CallbackHandler cbh, TTransport transport, boolean wrapUgi, Configuration conf) + throws IOException, SaslException { + super(mechanism, authorizationId, protocol, serverName, props, cbh, + transport); + if (wrapUgi) { + // If we don't set the configuration, the UGI will be created based on + // what's on the classpath, which may lack the kerberos changes we require + UserGroupInformation.setConfiguration(conf); + ugi = UserGroupInformation.getLoginUser(); + } + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws TTransportException { + if (ugi == null) { + baseOpen(); + } else { + try { + if (ugi.isFromKeytab()) { + ugi.checkTGTAndReloginFromKeytab(); + } + ugi.doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws TTransportException { + baseOpen(); + return null; + } + }); + } catch (IOException e) { + throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new TTransportException( + "Interrupted while opening underlying transport: " + e.getMessage(), e); + } + } + } + + private void baseOpen() throws TTransportException { + super.open(); + } + } + + /** + * Initialize the object based on the sentry configuration provided. + * @param conf Sentry configuration + * @param type Type indicates the service type + */ + public SentryServiceClientTransportDefaultImpl(Configuration conf, ServiceTransportConstants.sentryService type) + throws IOException { + String hostsAndPortsStr; + String[] hostsAndPortsStrArr; + HostAndPort[] hostsAndPorts; + int defaultPort; + this.conf = conf; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + + if (type == ServiceTransportConstants.sentryService.HDFS_SERVICE) { + serviceConstants = new HdfsServiceTransportConstants(); + } else { + serviceConstants = new PolicyServiceTransportConstants(); + } + + this.connectionTimeout = conf.getInt(serviceConstants.getServerRpcConnTimeout(), + serviceConstants.getServerRpcConnTimeoutDefault()); + this.rpcRetryTotal = conf.getInt(serviceConstants.getSentryRpcRetryTotal(), + serviceConstants.getSentryRpcRetryTotalDefault()); + this.connectionFullRetryTotal = conf.getInt(serviceConstants.getSentryFullRetryTotal(), + serviceConstants.getSentryFullRetryTotalDefault()); + this.kerberos = serviceConstants.getSecurityModeKerberos().equalsIgnoreCase( + conf.get(serviceConstants.getSecurityMode(), serviceConstants.getSecurityModeKerberos()).trim()); + + hostsAndPortsStr = conf.get(serviceConstants.getServerRpcAddress()); + if (hostsAndPortsStr == null) { + throw new RuntimeException("Config key " + + serviceConstants.getServerRpcAddress() + " is required"); + } + defaultPort = conf.getInt(serviceConstants.getServerRpcPort(), serviceConstants.getServerRpcPortDefault()); + + hostsAndPortsStrArr = hostsAndPortsStr.split(","); + hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); + + this.endpoints = new ArrayList(hostsAndPortsStrArr.length); + for( HostAndPort endpoint : hostsAndPorts) { + this.endpoints.add( + new InetSocketAddress(endpoint.getHostText(), endpoint.getPort())); + LOGGER.debug("Added server endpoint: " + endpoint.toString()); + } + serverAddress = null; + } + /** + * Initialize the object based on the parameters provided provided. + * @param addr Host address which the client needs to connect + * @param port Host Port which the client needs to connect + * @param conf Sentry configuration + * @param type Type indicates the service type + */ + public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf, ServiceTransportConstants.sentryService type) + throws IOException { + // copy the configuration because we may make modifications to it. + this.conf = new Configuration(conf); + + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + if (type == ServiceTransportConstants.sentryService.HDFS_SERVICE) { + serviceConstants = new HdfsServiceTransportConstants(); + } else { + serviceConstants = new PolicyServiceTransportConstants(); + } + + this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( + addr, "Config key " + serviceConstants.getServerRpcAddress() + " is required"), port); + this.connectionTimeout = conf.getInt(serviceConstants.getServerRpcConnTimeout(), + serviceConstants.getServerRpcConnTimeoutDefault()); + this.rpcRetryTotal = conf.getInt(serviceConstants.getSentryRpcRetryTotal(), + serviceConstants.getSentryRpcRetryTotalDefault()); + this.connectionFullRetryTotal = conf.getInt(serviceConstants.getSentryFullRetryTotal(), + serviceConstants.getSentryFullRetryTotalDefault()); + this.kerberos = serviceConstants.getSecurityModeKerberos().equalsIgnoreCase( + conf.get(serviceConstants.getSecurityMode(), serviceConstants.getSecurityModeKerberos()).trim()); + endpoints = null; + } + + + /** + * This is a no-op when already connected. + * When there is a connection error, it will retry with another sentry server. It will + * first cycle through all the available sentry servers, and then retry the whole server + * list no more than connectionFullRetryTotal times. In this case, it won't introduce + * more latency when some server fails. Also to prevent all clients connecting to the + * same server, it will reorder the endpoints randomly after a full retry. + * <p> + * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server. + * <p> + * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + */ + public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException { + if (isConnected() && (!tryAlternateServer)) { + return; + } + IOException currentException = null; + // Here for each full connectWithRetry it will cycle through all available sentry + // servers. Before each full connectWithRetry, it will shuffle the server list. + for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { + // Reorder endpoints randomly to prevent all clients connecting to the same endpoint + // at the same time after a node failure. + Collections.shuffle(endpoints); + for (InetSocketAddress addr : endpoints) { + try { + if (serverAddress != null && serverAddress.equals(addr)) { + continue; + } + serverAddress = addr; + connect(addr); + LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString())); + return; + } catch (IOException e) { + LOGGER.debug(String.format("Failed connection to %s: %s", + addr.toString(), e.getMessage()), e); + currentException = e; + } + } + } + + // Throw exception as reaching the max full connectWithRetry number. + LOGGER.error( + String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), + currentException); + throw currentException; + } + + /** + * Connect to the specified socket address and throw IOException if failed. + */ + protected void connect(InetSocketAddress serverAddress) throws IOException { + if (kerberos) { + String serverPrincipal = Preconditions.checkNotNull(conf.get(serviceConstants.getPrincipal()), serviceConstants.getPrincipal() + " is required"); + // since the client uses hadoop-auth, we need to set kerberos in + // hadoop-auth if we plan to use kerberos + conf.set(HADOOP_SECURITY_AUTHENTICATION, serviceConstants.getSecurityModeKerberos()); + + // Resolve server host in the same way as we are doing on server side + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + LOGGER.debug("Using server kerberos principal: " + serverPrincipal); + if (serverPrincipalParts == null) { + serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, + "Kerberos principal should have 3 parts: " + serverPrincipal); + } + boolean wrapUgi = "true".equalsIgnoreCase(conf + .get(serviceConstants.getSecurityUseUgiTransport(), "true")); + transport = new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), + null, serverPrincipalParts[0], serverPrincipalParts[1], + serviceConstants.getSaslProperties(), null, transport, wrapUgi, conf); + + } else { + serverPrincipalParts = null; + transport = new TSocket(serverAddress.getHostName(), + serverAddress.getPort(), connectionTimeout); + } + try { + transport.open(); + } catch (TTransportException e) { + throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); + } + LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); + } + + protected boolean isConnected() { + return transport != null && transport.isOpen(); + } + + public synchronized void close() { + if (isConnected()) { + transport.close(); + } + } + public int getRetryCount() { return rpcRetryTotal; } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/ThriftUtil.java new file mode 100644 index 0000000..5616360 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/ThriftUtil.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry; + +import com.google.common.net.HostAndPort; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public final class ThriftUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class); + + public static void setImpersonator(final TProtocol in) { + try { + TTransport transport = in.getTransport(); + if (transport instanceof TSaslServerTransport) { + String impersonator = ((TSaslServerTransport) transport).getSaslServer() + .getAuthorizationID(); + setImpersonator(impersonator); + } + } catch (Exception e) { + // If there has exception when get impersonator info, log the error information. + LOGGER.warn("There is an error when get the impersonator:" + e.getMessage()); + } + } + + public static void setIpAddress(final TProtocol in) { + try { + TTransport transport = in.getTransport(); + TSocket tSocket = getUnderlyingSocketFromTransport(transport); + if (tSocket != null) { + setIpAddress(tSocket.getSocket().getInetAddress().toString()); + } else { + LOGGER.warn("Unknown Transport, cannot determine ipAddress"); + } + } catch (Exception e) { + // If there has exception when get impersonator info, log the error information. + LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage()); + } + } + + /** + * Returns the underlying TSocket from the transport, or null of the transport type is unknown. + */ + private static TSocket getUnderlyingSocketFromTransport(TTransport transport) { + Preconditions.checkNotNull(transport); + if (transport instanceof TSaslServerTransport) { + return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport(); + } else if (transport instanceof TSaslClientTransport) { + return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport(); + } else if (transport instanceof TSocket) { + return (TSocket) transport; + } + return null; + } + + private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() { + @Override + protected synchronized String initialValue() { + return ""; + } + }; + + public static void setIpAddress(String ipAddress) { + threadLocalIpAddress.set(ipAddress); + } + + public static String getIpAddress() { + return threadLocalIpAddress.get(); + } + + private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() { + @Override + protected synchronized String initialValue() { + return ""; + } + }; + + public static void setImpersonator(String impersonator) { + threadLocalImpersonator.set(impersonator); + } + + public static String getImpersonator() { + return threadLocalImpersonator.get(); + } + + private ThriftUtil() { + // Make constructor private to avoid instantiation + } + + /** + * Utility function for parsing host and port strings. Expected form should be + * (host:port). The hostname could be in ipv6 style. If port is not specified, + * defaultPort will be used. + */ + public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) { + HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length]; + for (int i = 0; i < hostsAndPorts.length; i++) { + hostsAndPorts[i] = + HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort); + } + return hostsAndPorts; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/HdfsServiceTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/HdfsServiceTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/HdfsServiceTransportConstants.java new file mode 100644 index 0000000..159f934 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/HdfsServiceTransportConstants.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common; + +import com.google.common.collect.ImmutableMap; + +import javax.security.sasl.Sasl; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class holds all the transport constants needed for HDFS service client + */ + +public final class HdfsServiceTransportConstants extends ServiceTransportConstants { + + private static final ImmutableMap<String, String> SASL_PROPERTIES; + + static { + Map<String, String> saslProps = new HashMap<String, String>(); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put(Sasl.QOP, "auth-conf"); + SASL_PROPERTIES = ImmutableMap.copyOf(saslProps); + } + + public HdfsServiceTransportConstants() { + super(); + } + + /** + * This configuration parameter is only meant to be used for testing purposes. + */ + private final String SECURITY_MODE = "sentry.hdfs.service.security.mode"; + + private final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi"; + private final String PRINCIPAL = "sentry.hdfs.service.server.principal"; + private final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; + private final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD + + private final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port"; + private final int SERVER_RPC_PORT_DEFAULT = ServiceTransportConstants.RPC_PORT_DEFAULT; + private final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; + private final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout"; + private final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; + + public String getSecurityMode() { + return SECURITY_MODE; + } + + public String getSecurityUseUgiTransport() { + return SECURITY_USE_UGI_TRANSPORT; + } + + public String getPrincipal() { + return PRINCIPAL; + } + + public String getRpcAddress() { + return RPC_ADDRESS; + } + + public String getRpcAddressDefault() { + return RPC_ADDRESS_DEFAULT; + } + + public String getServerRpcPort() { + return SERVER_RPC_PORT; + } + + public String getServerRpcAddress() { + return SERVER_RPC_ADDRESS; + } + + public String getServerRpcConnTimeout() { + return SERVER_RPC_CONN_TIMEOUT; + } + + public int getServerRpcPortDefault() { + return SERVER_RPC_PORT_DEFAULT; + } + + public int getServerRpcConnTimeoutDefault() { + return SERVER_RPC_CONN_TIMEOUT_DEFAULT; + } + + public ImmutableMap<String, String> getSaslProperties() { + return SASL_PROPERTIES; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/PolicyServiceTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/PolicyServiceTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/PolicyServiceTransportConstants.java new file mode 100644 index 0000000..38ecc80 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/PolicyServiceTransportConstants.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common; + +import com.google.common.collect.ImmutableMap; + +import javax.security.sasl.Sasl; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class holds all the transport constants needed for Policy service clients + */ +public final class PolicyServiceTransportConstants extends ServiceTransportConstants { + + private static final ImmutableMap<String, String> SASL_PROPERTIES; + + static { + Map<String, String> saslProps = new HashMap<String, String>(); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put(Sasl.QOP, "auth-conf"); + SASL_PROPERTIES = ImmutableMap.copyOf(saslProps); + } + + public PolicyServiceTransportConstants() { + super(); + } + + /** + * This configuration parameter is only meant to be used for testing purposes. + */ + private final String SECURITY_MODE = "sentry.service.security.mode"; + + private final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi"; + private final String PRINCIPAL = "sentry.service.server.principal"; + private final String RPC_ADDRESS = "sentry.service.server.rpc-address"; + private final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD + + private final String SERVER_RPC_PORT = "sentry.service.client.server.rpc-port"; + private final int SERVER_RPC_PORT_DEFAULT = ServiceTransportConstants.RPC_PORT_DEFAULT; + private final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address"; + private final String SERVER_RPC_CONN_TIMEOUT = "sentry.service.client.server.rpc-connection-timeout"; + private final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; + + public String getSecurityMode() { + return SECURITY_MODE; + } + + public String getSecurityUseUgiTransport() { + return SECURITY_USE_UGI_TRANSPORT; + } + + public String getPrincipal() { + return PRINCIPAL; + } + + public String getRpcAddress() { + return RPC_ADDRESS; + } + + public String getRpcAddressDefault() { + return RPC_ADDRESS_DEFAULT; + } + + public String getServerRpcPort() { + return SERVER_RPC_PORT; + } + + public String getServerRpcAddress() { + return SERVER_RPC_ADDRESS; + } + + public String getServerRpcConnTimeout() { + return SERVER_RPC_CONN_TIMEOUT; + } + + public int getServerRpcPortDefault() { + return SERVER_RPC_PORT_DEFAULT; + } + + public int getServerRpcConnTimeoutDefault() { + return SERVER_RPC_CONN_TIMEOUT_DEFAULT; + } + + public ImmutableMap<String, String> getSaslProperties() { + return SASL_PROPERTIES; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/RetryClientInvocationHandler.java new file mode 100644 index 0000000..278dde0 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/RetryClientInvocationHandler.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.SentryClientInvocationHandler; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; +//import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; +//import org.apache.sentry.service.thrift.ServiceConstants; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool + * model. Currently only one client connection is allowed, and it's using lazy connection. + * The client is not connected to the sentry server until there is any rpc call. + * <p> + * For every rpc call, if the client is not connected, it will first connect to a sentry + * server, and then do the thrift call to the connected sentry server, which will execute + * the requested method and return back the response. If it is failed with connection + * problem, it will close the current connection and retry (reconnect and resend the + * thrift call) no more than rpcRetryTotal times. If the client is already connected, it + * will reuse the existing connection, and do the thrift call. + * <p> + * During reconnection, it will first cycle through all the available sentry servers, and + * then retry the whole server list no more than connectionFullRetryTotal times. In this + * case, it won't introduce more latency when some server fails. Also to prevent all + * clients connecting to the same server, it will reorder the endpoints randomly after a + * full retry. + * <p> + * TODO: allow multiple client connections + */ +public class RetryClientInvocationHandler extends SentryClientInvocationHandler { + private static final Logger LOGGER = + LoggerFactory.getLogger(RetryClientInvocationHandler.class); + private final Configuration conf; + private SentryServiceClient client = null; +// private final int rpcRetryTotal; + + /** + * Initialize the sentry configurations, including rpc retry count and client connection + * configs for SentryPolicyServiceClientDefaultImpl + */ + public RetryClientInvocationHandler(Configuration conf, SentryServiceClient client_object) throws IOException { + this.conf = conf; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + client = client_object; + } +/* + public void setObject(SentryServiceClient client_object ) + { + client = client_object; + } + */ + /** + * For every rpc call, if the client is not connected, it will first connect to a sentry + * server, and then do the thrift call to the connected sentry server, which will + * execute the requested method and return back the response. If it is failed with + * connection problem, it will close the current connection, and retry (reconnect and + * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException + * if failed retry after rpcRetryTotal times. + * Synchronized it for thread safety. + */ + @Override + synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { + int retryCount = 0; + Exception lastExc = null; + boolean tryAlternateServer = false; + + while (retryCount < client.getRetryCount()) { + // Connect to a sentry server if not connected yet. + try { + client.connectWithRetry(tryAlternateServer); + } catch (IOException e) { + // Increase the retry num + // Retry when the exception is caused by connection problem. + retryCount++; + lastExc = e; + close(); + continue; + } + + // do the thrift call + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + // Get the target exception, check if SentryUserException or TTransportException is wrapped. + // TTransportException means there has connection problem with the pool. + Throwable targetException = e.getCause(); + if (targetException instanceof SentryUserException || + targetException instanceof SentryHdfsServiceException) { + Throwable sentryTargetException = targetException.getCause(); + // If there has connection problem, eg, invalid connection if the service restarted, + // sentryTargetException instanceof TTransportException = true. + if (sentryTargetException instanceof TTransportException) { + // Retry when the exception is caused by connection problem. + lastExc = new TTransportException(sentryTargetException); + LOGGER.debug("Got TTransportException when do the thrift call ", lastExc); + tryAlternateServer = true; + } else { + // The exception is thrown by thrift call, eg, SentryAccessDeniedException. + // Do not need to reconnect to the sentry server. + throw (SentryUserException) targetException; + } + } else { + throw e; + } + } + + // Increase the retry num + retryCount++; + + // For connection problem, it will close the current connection, and reconnect to + // an available sentry server and redo the thrift call. + close(); + } + // Throw the exception as reaching the max rpc retry num. + LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc); + throw new SentryUserException( + String.format("failed after %d retries ", client.getRetryCount()), lastExc); + } + + @Override + public void close() { + client.close(); + LOGGER.debug("Close the current client connection"); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/SentryServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/SentryServiceClient.java new file mode 100644 index 0000000..51fc832 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/SentryServiceClient.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common; + +import java.io.IOException; +/** + * This interface is exposed to RetryClientInvocationHandler class to invoke retry on failure. + */ +public interface SentryServiceClient { + /** + * This is a no-op when already connected. + * When there is a connection error, it will retry with another sentry server. It will + * first cycle through all the available sentry servers, and then retry the whole server + * list no more than connectionFullRetryTotal times. In this case, it won't introduce + * more latency when some server fails. Also to prevent all clients connecting to the + * same server, it will reorder the endpoints randomly after a full retry. + * <p> + * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server. + * <p> + * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + */ + void connectWithRetry(boolean tryAlternateServer) throws IOException; + + int getRetryCount(); + + void close(); +} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/ServiceTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/ServiceTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/ServiceTransportConstants.java new file mode 100644 index 0000000..4aa8373 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/ServiceTransportConstants.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common; + +import com.google.common.collect.ImmutableMap; +import org.apache.sentry.SentryServiceClientTransportDefaultImpl; +//import java.lang.reflect.Method; + +/** + * This class acts as base class for transport constants needed for sentry clients/servers. + */ +public abstract class ServiceTransportConstants { + + public enum sentryService{ + DB_POLICY_SERVICE, + GENERIC_POLICY_SERVICE, + HDFS_SERVICE + } + + public static final int RPC_PORT_DEFAULT = 8038; + + // connection pool configuration + public static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled"; + public static final boolean SENTRY_POOL_ENABLED_DEFAULT = false; + + // commons-pool configuration for pool size + public final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total"; + public final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8; + public final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle"; + public final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8; + public final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle"; + public final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0; + + // retry num for getting the connection from connection pool + public final String SENTRY_POOL_RETRY_TOTAL = "sentry.service.client.connection.pool.retry-total"; + public final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = 3; + + /** + * full retry num for getting the connection in non-pool model + * In a full retry, it will cycle through all available sentry servers + * {@link SentryServiceClientTransportDefaultImpl#connectWithRetry(boolean)} + */ + public final String SENTRY_FULL_RETRY_TOTAL = "sentry.service.client.connection.full.retry-total"; + public final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2; + + /** + * max retry num for client rpc + * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} + */ + public static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; + public static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; + + + public final String SECURITY_MODE_KERBEROS = "kerberos"; + public final String SECURITY_MODE_NONE = "none"; + + public String getSecurityModeKerberos() { + return SECURITY_MODE_KERBEROS; + } + + public String getSecurityModeNone() { + return SECURITY_MODE_NONE; + } + + public String getSentryFullRetryTotal() { return SENTRY_FULL_RETRY_TOTAL; } + public int getSentryFullRetryTotalDefault() { return SENTRY_FULL_RETRY_TOTAL_DEFAULT;} + + public String getSentryRpcRetryTotal() { return SENTRY_RPC_RETRY_TOTAL; } + public int getSentryRpcRetryTotalDefault() { return SENTRY_RPC_RETRY_TOTAL_DEFAULT;} + + public abstract String getSecurityMode(); + public abstract String getSecurityUseUgiTransport(); + public abstract String getPrincipal(); + public abstract String getRpcAddress(); + public abstract String getRpcAddressDefault(); + public abstract String getServerRpcPort(); + public abstract String getServerRpcAddress(); + public abstract String getServerRpcConnTimeout(); + public abstract int getServerRpcPortDefault(); + public abstract int getServerRpcConnTimeoutDefault(); + + abstract public ImmutableMap<String, String> getSaslProperties(); + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java new file mode 100644 index 0000000..6b09dc2 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.exception; + +public class SentryHdfsServiceException extends RuntimeException { + private static final long serialVersionUID = 1511645864949767378L; + + public SentryHdfsServiceException(String message, Throwable cause) { + super(message, cause); + } + + public SentryHdfsServiceException(String message) { + super(message); + } + + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index 23552c2..d536891 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -72,25 +72,15 @@ public class ServiceConstants { public static class ClientConfig { public static final ImmutableMap<String, String> SASL_PROPERTIES = ServiceConstants.SASL_PROPERTIES; - public static final String SECURITY_MODE = "sentry.hdfs.service.security.mode"; public static final String SECURITY_MODE_KERBEROS = "kerberos"; - public static final String SECURITY_MODE_NONE = "none"; - public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi"; - public static final String PRINCIPAL = "sentry.hdfs.service.server.principal"; - public static final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port"; - public static final int SERVER_RPC_PORT_DEFAULT = 8038; - - public static final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; - - public static final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout"; - public static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; public static final String USE_COMPACT_TRANSPORT = "sentry.hdfs.service.client.compact.transport"; public static final boolean USE_COMPACT_TRANSPORT_DEFAULT = false; // max message size for thrift messages public static final String SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE = "sentry.hdfs.thrift.max.message.size"; public static final long SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024; + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java index ab12bf4..6f57dd8 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java @@ -17,7 +17,10 @@ */ package org.apache.sentry.hdfs; -public interface SentryHDFSServiceClient { +import org.apache.sentry.core.common.SentryServiceClient; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; + +public interface SentryHDFSServiceClient extends SentryServiceClient { String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService"; void notifyHMSUpdate(PathsUpdate update) @@ -28,6 +31,5 @@ public interface SentryHDFSServiceClient { SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum) throws SentryHdfsServiceException; - void close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java index 03bf39e..7f92da6 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java @@ -15,201 +15,100 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sentry.hdfs; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.LinkedList; -import java.util.Map; - -import javax.security.auth.callback.CallbackHandler; +package org.apache.sentry.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SaslRpcServer.AuthMethod; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.SentryServiceClientTransportDefaultImpl; +import org.apache.sentry.core.common.ServiceTransportConstants; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; -import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; -import org.apache.sentry.hdfs.ServiceConstants.ClientConfig; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; +import java.io.IOException; +import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; -import com.google.common.base.Preconditions; - -public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient { - - private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class); - - /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). - */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - protected UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String authorizationId, - String protocol, String serverName, Map<String, String> props, - CallbackHandler cbh, TTransport transport, boolean wrapUgi) - throws IOException { - super(mechanism, authorizationId, protocol, serverName, props, cbh, - transport); - if (wrapUgi) { - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - // ensure that the ticket is valid before connecting to service. Note that - // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80% - // of ticket lifetime has passed. - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport", e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport", e); - } - } - } - - private void baseOpen() throws TTransportException { - super.open(); - } - } +public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient { - private final Configuration conf; - private final InetSocketAddress serverAddress; - private final int connectionTimeout; - private boolean kerberos; - private TTransport transport; +private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class); - private String[] serverPrincipalParts; - private Client client; + private SentryHDFSService.Client client; public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException { - this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key " - + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt( - ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT)); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - kerberos = ClientConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ClientConfig.SECURITY_MODE, ClientConfig.SECURITY_MODE_KERBEROS).trim()); - transport = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull( - conf.get(ClientConfig.PRINCIPAL), ClientConfig.PRINCIPAL + " is required"); - - // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.info("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ClientConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), - null, serverPrincipalParts[0], serverPrincipalParts[1], - ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); - } else { - serverPrincipalParts = null; - } - try { - transport.open(); - } catch (TTransportException e) { - throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); - } - LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress); - TProtocol tProtocol = null; - long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE, - ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); - if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT, - ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) { - tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize); - } else { - tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true); - } - TMultiplexedProtocol protocol = new TMultiplexedProtocol( - tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME); - client = new SentryHDFSService.Client(protocol); - LOGGER.info("Successfully created client"); + super(conf,ServiceTransportConstants.sentryService.HDFS_SERVICE); } - - public synchronized void notifyHMSUpdate(PathsUpdate update) - throws SentryHdfsServiceException { - try { - client.handle_hms_notification(update.toThrift()); - } catch (Exception e) { - throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); + public SentryHDFSServiceClientDefaultImpl(String addr, int port, + Configuration conf) throws IOException { + super(addr, port, conf,ServiceTransportConstants.sentryService.HDFS_SERVICE); + connect(serverAddress); } - } - public synchronized long getLastSeenHMSPathSeqNum() - throws SentryHdfsServiceException { - try { - return client.check_hms_seq_num(-1); - } catch (Exception e) { - throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); + /** + * Connect to the specified socket address and throw IOException if failed. + */ + @Override + protected void connect(InetSocketAddress serverAddress) throws IOException { + super.connect(serverAddress); + + TProtocol tProtocol = null; + long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE, + ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT, + ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) { + tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize); + } else { + tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true); + } + TMultiplexedProtocol protocol = new TMultiplexedProtocol( + tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME); + client = new SentryHDFSService.Client(protocol); + LOGGER.info("Successfully created client"); } - } - public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum) - throws SentryHdfsServiceException { - SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>()); - try { - TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum); - if (sentryUpdates.getAuthzPathUpdate() != null) { - for (TPathsUpdate pathsUpdate : sentryUpdates.getAuthzPathUpdate()) { - retVal.getPathUpdates().add(new PathsUpdate(pathsUpdate)); + public synchronized void notifyHMSUpdate(PathsUpdate update) + throws SentryHdfsServiceException { + try { + client.handle_hms_notification(update.toThrift()); + } catch (Exception e) { + throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); } - } - if (sentryUpdates.getAuthzPermUpdate() != null) { - for (TPermissionsUpdate permsUpdate : sentryUpdates.getAuthzPermUpdate()) { - retVal.getPermUpdates().add(new PermissionsUpdate(permsUpdate)); + } + + public synchronized long getLastSeenHMSPathSeqNum() + throws SentryHdfsServiceException { + try { + return client.check_hms_seq_num(-1); + } catch (Exception e) { + throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); } - } - } catch (Exception e) { - throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); } - return retVal; - } - public void close() { - if (transport != null) { - transport.close(); + public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum) + throws SentryHdfsServiceException { + SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>()); + try { + TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum); + if (sentryUpdates.getAuthzPathUpdate() != null) { + for (TPathsUpdate pathsUpdate : sentryUpdates.getAuthzPathUpdate()) { + retVal.getPathUpdates().add(new PathsUpdate(pathsUpdate)); + } + } + if (sentryUpdates.getAuthzPermUpdate() != null) { + for (TPermissionsUpdate permsUpdate : sentryUpdates.getAuthzPermUpdate()) { + retVal.getPermUpdates().add(new PermissionsUpdate(permsUpdate)); + } + } + } catch (Exception e) { + throw new SentryHdfsServiceException("Thrift Exception occurred !!", e); + } + return retVal; } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java index 2a18b15..60056b8 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java @@ -18,7 +18,10 @@ package org.apache.sentry.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.RetryClientInvocationHandler; +import org.apache.sentry.service.thrift.ServiceConstants; +import java.lang.reflect.Proxy; /** * Client factory to create normal client or proxy with HA invocation handler */ @@ -27,10 +30,21 @@ public class SentryHDFSServiceClientFactory { private SentryHDFSServiceClientFactory() { // Make constructor private to avoid instantiation } - + public static SentryHDFSServiceClient create(Configuration conf) - throws Exception { - return new SentryHDFSServiceClientDefaultImpl(conf); + throws Exception { + boolean pooled = conf.getBoolean( + ServiceConstants.ClientConfig.SENTRY_POOL_ENABLED, ServiceConstants.ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); + if (pooled) { + //SentryHDFSServiceClient doesn't have pool implementation + // TODO Implement pool for SentryHDFSServiceClient + return null; + } else { + RetryClientInvocationHandler clientHandler = new RetryClientInvocationHandler(conf, new SentryHDFSServiceClientDefaultImpl(conf)); + return (SentryHDFSServiceClient) Proxy + .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(), + SentryHDFSServiceClientDefaultImpl.class.getInterfaces(), + clientHandler); + } } - } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java index db55b5a..4c169d6 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java @@ -21,7 +21,7 @@ package org.apache.sentry.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.thrift.TException; import org.apache.thrift.TMultiplexedProcessor; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java deleted file mode 100644 index 307d8c3..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.hdfs; - -public class SentryHdfsServiceException extends RuntimeException { - private static final long serialVersionUID = 1511645864949767378L; - - public SentryHdfsServiceException(String message, Throwable cause) { - super(message, cause); - } - - public SentryHdfsServiceException(String message) { - super(message); - } - - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java index eccf83b..4c817db 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java +++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java @@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction; import org.apache.hadoop.security.UserGroupInformation; import org.apache.sentry.hdfs.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.SentryServiceIntegrationBase; +import org.apache.sentry.core.common.HdfsServiceTransportConstants; import org.junit.After; import org.junit.Before; @@ -54,17 +55,16 @@ public class SentryHdfsServiceIntegrationBase extends } // SentryHdfs client configuration setup - conf.set(ClientConfig.SERVER_RPC_ADDRESS, server.getAddress() + HdfsServiceTransportConstants serviceConstants = new HdfsServiceTransportConstants(); + conf.set(serviceConstants.getRpcAddress(), server.getAddress() .getHostName()); - conf.set(ClientConfig.SERVER_RPC_ADDRESS, server.getAddress() - .getHostName()); - conf.set(ClientConfig.SERVER_RPC_PORT, + conf.set(serviceConstants.getServerRpcPort(), String.valueOf(server.getAddress().getPort())); if (kerberos) { - conf.set(ClientConfig.SECURITY_MODE, ClientConfig.SECURITY_MODE_KERBEROS); - conf.set(ClientConfig.SECURITY_USE_UGI_TRANSPORT, "true"); - conf.set(ClientConfig.PRINCIPAL, getServerKerberosName()); + conf.set(serviceConstants.getSecurityMode(), ClientConfig.SECURITY_MODE_KERBEROS); + conf.set(serviceConstants.getSecurityUseUgiTransport(), "true"); + conf.set(serviceConstants.getPrincipal(), getServerKerberosName()); hdfsClient = UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<SentryHDFSServiceClient>() { @Override http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java index d320d0f..1cad25f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java @@ -18,7 +18,7 @@ package org.apache.sentry.provider.db.generic.service.thrift; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.ThriftUtil; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java index 11cdee7..716bf59 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java @@ -21,11 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.sentry.core.common.SentryServiceClient; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -public interface SentryGenericServiceClient { +public interface SentryGenericServiceClient extends SentryServiceClient{ /** * Create a sentry role @@ -192,5 +193,4 @@ public interface SentryGenericServiceClient { String serviceName, String requestorUserName, Set<String> authorizablesSet, Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException; - void close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/53003443/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java index ee6cdf7..dacc25a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java @@ -19,144 +19,49 @@ package org.apache.sentry.provider.db.generic.service.thrift; import java.io.IOException; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.*; -import javax.security.auth.callback.CallbackHandler; - import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SaslRpcServer.AuthMethod; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.SentryServiceClientTransportDefaultImpl; +import org.apache.sentry.core.common.ServiceTransportConstants; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.service.thrift.ServiceConstants; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.Status; import org.apache.sentry.service.thrift.sentry_common_serviceConstants; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class SentryGenericServiceClientDefaultImpl implements SentryGenericServiceClient { - private final Configuration conf; - private final InetSocketAddress serverAddress; - private final boolean kerberos; - private final String[] serverPrincipalParts; +public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient { private SentryGenericPolicyService.Client client; - private TTransport transport; - private int connectionTimeout; private static final Logger LOGGER = LoggerFactory .getLogger(SentryGenericServiceClientDefaultImpl.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured "; - /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). + * Initialize the sentry configurations. */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - protected UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String authorizationId, - String protocol, String serverName, Map<String, String> props, - CallbackHandler cbh, TTransport transport, boolean wrapUgi, Configuration conf) - throws IOException { - super(mechanism, authorizationId, protocol, serverName, props, cbh, - transport); - if (wrapUgi) { - // If we don't set the configuration, the UGI will be created based on - // what's on the classpath, which may lack the kerberos changes we require - UserGroupInformation.setConfiguration(conf); - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport: " + e.getMessage(), e); - } - } - } + public SentryGenericServiceClientDefaultImpl(Configuration conf) + throws IOException { + super(conf,ServiceTransportConstants.sentryService.GENERIC_POLICY_SERVICE); + } - private void baseOpen() throws TTransportException { - super.open(); - } + public SentryGenericServiceClientDefaultImpl(String addr, int port, + Configuration conf) throws IOException { + super(addr,port,conf, ServiceTransportConstants.sentryService.GENERIC_POLICY_SERVICE); + connect(serverAddress); } - public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException { - // copy the configuration because we may make modifications to it. - this.conf = new Configuration(conf); - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key " - + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt( - ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT)); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); - transport = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required"); - // since the client uses hadoop-auth, we need to set kerberos in - // hadoop-auth if we plan to use kerberos - conf.set(HADOOP_SECURITY_AUTHENTICATION, ServerConfig.SECURITY_MODE_KERBEROS); - - // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), - null, serverPrincipalParts[0], serverPrincipalParts[1], - ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi, conf); - } else { - serverPrincipalParts = null; - } - try { - transport.open(); - } catch (TTransportException e) { - throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); - } - LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); + /** + * Connect to the specified socket address and throw IOException if failed. + */ + protected void connect(InetSocketAddress serverAddress) throws IOException { + super.connect(serverAddress); long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); TMultiplexedProtocol protocol = new TMultiplexedProtocol( @@ -166,8 +71,6 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi LOGGER.debug("Successfully created client"); } - - /** * Create a sentry role * @param requestorUserName: user on whose behalf the request is issued @@ -580,12 +483,4 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e); } } - - @Override - public void close() { - if (transport != null) { - transport.close(); - } - } - }
