SENTRY-1593: Implement client failover for Generic and NN clients CDH-53213 Backport of SENTRY-1593
Change-Id: I59e0deba9160ea26ca609107ec83748b4df7c291 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/22186 Tested-by: Jenkins User Reviewed-by: Kalyan Kumar Kalvagadda <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/0dbe38aa Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/0dbe38aa Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/0dbe38aa Branch: refs/for/cdh5-1.5.1_ha Commit: 0dbe38aa8bd61c8ea44458614fcaa7d1bcbf0d56 Parents: f21172c Author: Alexander Kolbasov <[email protected]> Authored: Thu Apr 27 21:59:00 2017 -0700 Committer: Kalyan Kumar Kalvagadda <[email protected]> Committed: Fri Apr 28 13:12:55 2017 -0700 ---------------------------------------------------------------------- sentry-core/sentry-core-common/pom.xml | 4 + .../exception/SentryHdfsServiceException.java | 33 ++ .../transport/RetryClientInvocationHandler.java | 147 +++++ .../SentryClientInvocationHandler.java | 54 ++ .../SentryClientTransportConfigInterface.java | 7 + .../SentryClientTransportConstants.java | 1 + .../SentryHDFSClientTransportConfig.java | 5 + .../SentryPolicyClientTransportConfig.java | 5 + .../common/transport/SentryServiceClient.java | 43 ++ .../transport/SentryTransportFactory.java | 309 ++++++++++ .../core/common/utils/PolicyStoreConstants.java | 28 + .../sentry/core/common/utils/ThriftUtil.java | 123 ++++ .../apache/sentry/hdfs/ServiceConstants.java | 5 +- sentry-hdfs/sentry-hdfs-dist/pom.xml | 4 + .../sentry/hdfs/SentryHDFSServiceClient.java | 2 +- .../SentryHDFSServiceClientDefaultImpl.java | 182 ++---- .../hdfs/SentryHDFSServiceClientFactory.java | 26 +- .../hdfs/SentryHDFSServiceProcessorFactory.java | 2 +- .../sentry/hdfs/SentryHdfsServiceException.java | 33 -- .../thrift/SentryGenericPolicyProcessor.java | 2 +- .../SentryGenericPolicyProcessorWrapper.java | 2 +- .../SentryGenericServiceClientDefaultImpl.java | 321 +++++----- .../SentryGenericServiceClientFactory.java | 22 +- .../db/log/entity/JsonLogEntityFactory.java | 2 +- .../service/persistent/TransactionManager.java | 2 +- .../db/service/thrift/PolicyStoreConstants.java | 28 - .../SentryPolicyServiceClientDefaultImpl.java | 585 ++++++++----------- .../thrift/SentryPolicyStoreProcessor.java | 2 +- .../service/thrift/SentryProcessorWrapper.java | 1 + .../provider/db/service/thrift/ThriftUtil.java | 123 ---- .../thrift/PoolClientInvocationHandler.java | 3 +- .../thrift/RetryClientInvocationHandler.java | 146 ----- .../thrift/SentryClientInvocationHandler.java | 54 -- .../thrift/SentryServiceClientFactory.java | 28 +- .../sentry/service/thrift/ServiceConstants.java | 19 +- .../TestSentryGenericPolicyProcessor.java | 2 +- .../thrift/TestSentryGenericServiceClient.java | 61 ++ .../db/log/entity/TestJsonLogEntityFactory.java | 2 +- .../log/entity/TestJsonLogEntityFactoryGM.java | 2 +- .../thrift/TestSentryPolicyServiceClient.java | 64 ++ .../thrift/TestSentryPolicyStoreProcessor.java | 2 +- .../thrift/TestPoolClientInvocationHandler.java | 2 +- 42 files changed, 1376 insertions(+), 1112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml index d6e4ca3..20376b9 100644 --- a/sentry-core/sentry-core-common/pom.xml +++ b/sentry-core/sentry-core-common/pom.xml @@ -62,6 +62,10 @@ limitations under the License. <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java new file mode 100644 index 0000000..6b09dc2 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.exception; + +public class SentryHdfsServiceException extends RuntimeException { + private static final long serialVersionUID = 1511645864949767378L; + + public SentryHdfsServiceException(String message, Throwable cause) { + super(message, cause); + } + + public SentryHdfsServiceException(String message) { + super(message); + } + + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java new file mode 100644 index 0000000..34a594e --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.transport; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool + * model. Currently only one client connection is allowed. + * <p> + * For every rpc call, if the client is not connected, it will first connect to one of the + * sentry servers, and then do the thrift call to the connected sentry server, which will + * execute the requested method and return back the response. If it is failed with connection + * problem, it will close the current connection and retry (reconnect and resend the + * thrift call) no more than rpcRetryTotal times. If the client is already connected, it + * will reuse the existing connection, and do the thrift call. + * <p> + * During reconnection, invocatiaon handler will first cycle through all the configured sentry servers, and + * then retry the whole server list no more than connectionFullRetryTotal times. In this + * case, it won't introduce more latency when some server fails. + * <p> + */ + +public class RetryClientInvocationHandler extends SentryClientInvocationHandler { + private static final Logger LOGGER = + LoggerFactory.getLogger(RetryClientInvocationHandler.class); + private SentryServiceClient client = null; + private final int maxRetryCount; + + /** + * Initialize the sentry configurations, including rpc retry count and client connection + * configs for SentryPolicyServiceClientDefaultImpl + */ + public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject, + SentryClientTransportConfigInterface transportConfig) { + Preconditions.checkNotNull(conf, "Configuration object cannot be null"); + Preconditions.checkNotNull(clientObject, "Client Object cannot be null"); + client = clientObject; + maxRetryCount = transportConfig.getSentryRpcRetryTotal(conf); + } + + /** + * For every rpc call, if the client is not connected, it will first connect to a sentry + * server, and then do the thrift call to the connected sentry server, which will + * execute the requested method and return back the response. If it is failed with + * connection problem, it will close the current connection, and retry (reconnect and + * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException + * if failed retry after rpcRetryTotal times. + * if it is failed with other exception, method would just re-throw the exception. + * Synchronized it for thread safety. + */ + @Override + public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { + int retryCount = 0; + Exception lastExc = null; + + while (retryCount < maxRetryCount) { + // Connect to a sentry server if not connected yet. + try { + client.connect(); + } catch (IOException e) { + // Increase the retry num + // Retry when the exception is caused by connection problem. + retryCount++; + lastExc = e; + close(); + continue; + } + + // do the thrift call + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + // Get the target exception, check if SentryUserException or TTransportException is wrapped. + // TTransportException means there is a connection problem. + Throwable targetException = e.getCause(); + if (targetException instanceof SentryUserException || + targetException instanceof SentryHdfsServiceException) { + Throwable sentryTargetException = targetException.getCause(); + // If there has connection problem, eg, invalid connection if the service restarted, + // sentryTargetException instanceof TTransportException will be true. + if (sentryTargetException instanceof TTransportException) { + // Retry when the exception is caused by connection problem. + lastExc = new TTransportException(sentryTargetException); + LOGGER.error("Thrift call failed with TTransportException", lastExc); + // Closing the thrift client on TTransportException. New client object is + // created using new socket when an attempt to reconnect is made. + close(); + } else { + // The exception is thrown by thrift call, eg, SentryAccessDeniedException. + // Do not need to reconnect to the sentry server. + if (targetException instanceof SentryUserException) { + throw (SentryUserException) targetException; + } else { + throw (SentryHdfsServiceException) targetException; + } + } + } else { + throw e; + } + } + + // Increase the retry num + retryCount++; + } + + // Throw the exception as reaching the max rpc retry num. + String error = String.format("Request failed, %d retries attempted ", maxRetryCount); + LOGGER.error(error, lastExc); + throw new SentryUserException(error, lastExc); + } + + @Override + public synchronized void close() { + try { + LOGGER.debug("Releasing the current client connection"); + client.disconnect(); + } catch (Exception e) { + LOGGER.error("Encountered failure while closing the connection"); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java new file mode 100644 index 0000000..bf33fda --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.transport; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; + +/** + * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY + */ +public abstract class SentryClientInvocationHandler implements InvocationHandler { + + /** + * Close the InvocationHandler: An InvocationHandler may create some contexts, + * these contexts should be close when the method "close()" of client be called. + */ + @Override + public final Object invoke(Object proxy, Method method, Object[] args) throws Exception { + // close() doesn't throw exception we supress that in case of connection + // loss. Changing SentryPolicyServiceClient#close() to throw an + // exception would be a backward incompatible change for Sentry clients. + if ("close".equals(method.getName()) && null == args) { + close(); + return null; + } + return invokeImpl(proxy, method, args); + } + + /** + * Subclass should implement this method for special function + */ + abstract public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception; + + /** + * An abstract method "close", an invocationHandler should close its contexts at here. + */ + public abstract void close(); + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java index 6cea596..24192fd 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java @@ -40,6 +40,13 @@ interface SentryClientTransportConfigInterface { /** * @param conf configuration + * @return number of times should client re-create the transport and try to connect + * before finally giving up. + */ + int getSentryRpcRetryTotal(Configuration conf); + + /** + * @param conf configuration * @return True, if kerberos should be enabled. * False, Iff kerberos is enabled. * @throws MissingConfigurationException if property is mandatory and is missing in http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java index 83790d8..3520787 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java @@ -28,6 +28,7 @@ package org.apache.sentry.core.common.transport; * <code>SentryClientTransportConfigInterface</code>. */ class SentryClientTransportConstants { + /** * max retry num for client rpc * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java index 64750e7..74f790b 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java @@ -46,6 +46,11 @@ public final class SentryHDFSClientTransportConfig } @Override + public int getSentryRpcRetryTotal(Configuration conf) { + return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT); + } + + @Override public boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException { return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java index 85ddd31..37fd0b3 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java @@ -46,6 +46,11 @@ public final class SentryPolicyClientTransportConfig } @Override + public int getSentryRpcRetryTotal(Configuration conf) { + return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT); + } + + @Override public boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException { return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java new file mode 100644 index 0000000..9a10ca5 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.transport; + +/** + * Client interface for Proxy Invocation handlers + * <p> + * Defines interface that Sentry client's should expose to the Invocation handlers like + * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry + * client instances . + * <p> + * All the sentry clients that need retrying and failover capabilities should implement + * this interface. + */ +public interface SentryServiceClient { + /** + * Connect to Sentry server. + * Either creates a new connection or reuses an existing one. + * @throws Exception on failure to acquire a transport towards server. + */ + void connect() throws Exception; + + /** + * Disconnect from the server. May close connection or return it to a + * pool for reuse. + */ + void disconnect(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java new file mode 100644 index 0000000..9ddb400 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.exception.MissingConfigurationException; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.sentry.core.common.utils.ThriftUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; + +/** + * Create Thrift transports suitable for talking to Sentry + */ + +public class SentryTransportFactory { + protected final Configuration conf; + private String[] serverPrincipalParts; + protected TTransport thriftTransport; + private final int connectionTimeout; + private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class); + // configs for connection retry + private final int connectionFullRetryTotal; + private final ArrayList<InetSocketAddress> endpoints; + private final SentryClientTransportConfigInterface transportConfig; + private static final ImmutableMap<String, String> SASL_PROPERTIES = + ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); + + /** + * This transport wraps the Sasl transports to set up the right UGI context for open(). + */ + public static class UgiSaslClientTransport extends TSaslClientTransport { + UserGroupInformation ugi = null; + + public UgiSaslClientTransport(String mechanism, String protocol, + String serverName, TTransport transport, + boolean wrapUgi, Configuration conf) + throws IOException, SaslException { + super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null, + transport); + if (wrapUgi) { + // If we don't set the configuration, the UGI will be created based on + // what's on the classpath, which may lack the kerberos changes we require + UserGroupInformation.setConfiguration(conf); + ugi = UserGroupInformation.getLoginUser(); + } + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws TTransportException { + if (ugi == null) { + baseOpen(); + } else { + try { + if (ugi.isFromKeytab()) { + ugi.checkTGTAndReloginFromKeytab(); + } + ugi.doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws TTransportException { + baseOpen(); + return null; + } + }); + } catch (IOException e) { + throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new TTransportException( + "Interrupted while opening underlying transport: " + e.getMessage(), e); + } + } + } + + private void baseOpen() throws TTransportException { + super.open(); + } + } + + /** + * Initialize the object based on the sentry configuration provided. + * List of configured servers are reordered randomly preventing all + * clients connecting to the same server. + * + * @param conf Sentry configuration + * @param transportConfig transport configuration to use + */ + public SentryTransportFactory(Configuration conf, + SentryClientTransportConfigInterface transportConfig) throws IOException { + + this.conf = conf; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + serverPrincipalParts = null; + this.transportConfig = transportConfig; + + try { + String hostsAndPortsStr; + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); + + hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); + + int serverPort = transportConfig.getServerRpcPort(conf); + + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort); + + this.endpoints = new ArrayList(hostsAndPortsStrArr.length); + for (HostAndPort endpoint : hostsAndPorts) { + this.endpoints.add( + new InetSocketAddress(endpoint.getHostText(), endpoint.getPort())); + LOGGER.debug("Added server endpoint: " + endpoint.toString()); + } + + // Reorder endpoints randomly to prevent all clients connecting to the same endpoint + // at the same time after a node failure. + Collections.shuffle(endpoints); + } catch (MissingConfigurationException e) { + throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e); + } + } + + /** + * Initialize object based on the parameters provided provided. + * + * @param addr Host address which the client needs to connect + * @param port Host Port which the client needs to connect + * @param conf Sentry configuration + * @param transportConfig transport configuration to use + */ + public SentryTransportFactory(String addr, int port, Configuration conf, + SentryClientTransportConfigInterface transportConfig) throws IOException { + // copy the configuration because we may make modifications to it. + this.conf = new Configuration(conf); + serverPrincipalParts = null; + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + this.transportConfig = transportConfig; + + try { + this.endpoints = new ArrayList(1); + this.endpoints.add(NetUtils.createSocketAddr(addr, port)); + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); + } catch (MissingConfigurationException e) { + throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e); + } + } + + + /** + * On connection error, Iterates through all the configured servers and tries to connect. + * On successful connection, control returns + * On connection failure, continues iterating through all the configured sentry servers, + * and then retries the whole server list no more than connectionFullRetryTotal times. + * In this case, it won't introduce more latency when some server fails. + * <p> + * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + */ + public TTransport getTransport() throws IOException { + IOException currentException = null; + for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { + try { + return connectToAvailableServer(); + } catch (IOException e) { + currentException = e; + LOGGER.error( + String.format("Failed to connect to all the configured sentry servers, " + + "Retrying again")); + } + } + // Throws exception on reaching the connectionFullRetryTotal. + LOGGER.error( + String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), + currentException); + throw currentException; + } + + /** + * Iterates through all the configured servers and tries to connect. + * On connection error, tries to connect to next server. + * Control returns on successful connection OR it's done trying to all the + * configured servers. + * + * @throws IOException + */ + private TTransport connectToAvailableServer() throws IOException { + IOException currentException = null; + for (InetSocketAddress addr : endpoints) { + try { + return connectToServer(addr); + } catch (IOException e) { + LOGGER.error(String.format("Failed connection to %s: %s", + addr.toString(), e.getMessage()), e); + currentException = e; + } + } + throw currentException; + } + + /** + * Connect to the specified socket address and throw IOException if failed. + * + * @param serverAddress Address client needs to connect + * @throws Exception if there is failure in establishing the connection. + */ + private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException { + try { + thriftTransport = createTransport(serverAddress); + thriftTransport.open(); + } catch (TTransportException e) { + throw new IOException("Failed to open transport: " + e.getMessage(), e); + } catch (MissingConfigurationException e) { + throw new RuntimeException(e.getMessage(), e); + } + + LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress); + return thriftTransport; + } + + /** + * New socket is is created + * + * @param serverAddress + * @return + * @throws TTransportException + * @throws MissingConfigurationException + * @throws IOException + */ + private TTransport createTransport(InetSocketAddress serverAddress) + throws TTransportException, MissingConfigurationException, IOException { + TTransport socket = new TSocket(serverAddress.getHostName(), + serverAddress.getPort(), connectionTimeout); + + if (!transportConfig.isKerberosEnabled(conf)) { + return socket; + } else { + String serverPrincipal = transportConfig.getSentryPrincipal(conf); + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + LOGGER.debug("Using server kerberos principal: " + serverPrincipal); + if (serverPrincipalParts == null) { + serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, + "Kerberos principal should have 3 parts: " + serverPrincipal); + } + + boolean wrapUgi = transportConfig.useUserGroupInformation(conf); + return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), + serverPrincipalParts[0], serverPrincipalParts[1], + socket, wrapUgi, conf); + } + } + + private boolean isConnected() { + return thriftTransport != null && thriftTransport.isOpen(); + } + + /** + * Method currently closes the transport + * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's + * That way, new connection need not be created for each new client + */ + public void releaseTransport() { + close(); + } + + /** + * Method closes the transport + */ + public void close() { + if (isConnected()) { + thriftTransport.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java new file mode 100644 index 0000000..8f73d01 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.utils; + +public class PolicyStoreConstants { + public static final String SENTRY_GENERIC_POLICY_NOTIFICATION = "sentry.generic.policy.notification"; + public static final String SENTRY_GENERIC_POLICY_STORE = "sentry.generic.policy.store"; + public static final String SENTRY_GENERIC_POLICY_STORE_DEFAULT = + "org.apache.sentry.provider.db.generic.service.persistent.DelegateSentryStore"; + public static class PolicyStoreServerConfig { + public static final String NOTIFICATION_HANDLERS = "sentry.policy.store.notification.handlers"; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java new file mode 100644 index 0000000..9e38a30 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.utils; + +import com.google.common.net.HostAndPort; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class ThriftUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class); + + public static void setImpersonator(final TProtocol in) { + try { + TTransport transport = in.getTransport(); + if (transport instanceof TSaslServerTransport) { + String impersonator = ((TSaslServerTransport) transport).getSaslServer() + .getAuthorizationID(); + setImpersonator(impersonator); + } + } catch (Exception e) { + // If there has exception when get impersonator info, log the error information. + LOGGER.warn("There is an error when get the impersonator:" + e.getMessage()); + } + } + + public static void setIpAddress(final TProtocol in) { + try { + TTransport transport = in.getTransport(); + TSocket tSocket = getUnderlyingSocketFromTransport(transport); + if (tSocket != null) { + setIpAddress(tSocket.getSocket().getInetAddress().toString()); + } else { + LOGGER.warn("Unknown Transport, cannot determine ipAddress"); + } + } catch (Exception e) { + // If there has exception when get impersonator info, log the error information. + LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage()); + } + } + + /** + * Returns the underlying TSocket from the transport, or null of the transport type is unknown. + */ + private static TSocket getUnderlyingSocketFromTransport(TTransport transport) { + Preconditions.checkNotNull(transport); + if (transport instanceof TSaslServerTransport) { + return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport(); + } else if (transport instanceof TSaslClientTransport) { + return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport(); + } else if (transport instanceof TSocket) { + return (TSocket) transport; + } + return null; + } + + private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() { + @Override + protected synchronized String initialValue() { + return ""; + } + }; + + public static void setIpAddress(String ipAddress) { + threadLocalIpAddress.set(ipAddress); + } + + public static String getIpAddress() { + return threadLocalIpAddress.get(); + } + + private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() { + @Override + protected synchronized String initialValue() { + return ""; + } + }; + + public static void setImpersonator(String impersonator) { + threadLocalImpersonator.set(impersonator); + } + + public static String getImpersonator() { + return threadLocalImpersonator.get(); + } + + /** + * Utility function for parsing host and port strings. Expected form should be + * (host:port). The hostname could be in ipv6 style. If port is not specified, + * defaultPort will be used. + */ + public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) { + HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length]; + for (int i = 0; i < hostsAndPorts.length; i++) { + hostsAndPorts[i] = + HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort); + } + return hostsAndPorts; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index 95285e4..0203ab3 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -77,12 +77,9 @@ public class ServiceConstants { public static final String PRINCIPAL = "sentry.hdfs.service.server.principal"; public static final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port"; - public static final int SERVER_RPC_PORT_DEFAULT = 8038; - + public static final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; - public static final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout"; - public static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; public static final String USE_COMPACT_TRANSPORT = "sentry.hdfs.service.client.compact.transport"; public static final boolean USE_COMPACT_TRANSPORT_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-dist/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-dist/pom.xml b/sentry-hdfs/sentry-hdfs-dist/pom.xml index 8aa10f7..b614254 100644 --- a/sentry-hdfs/sentry-hdfs-dist/pom.xml +++ b/sentry-hdfs/sentry-hdfs-dist/pom.xml @@ -49,6 +49,10 @@ limitations under the License. <groupId>org.apache.sentry</groupId> <artifactId>sentry-core-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-core-common</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java index 956b855..de9507b 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java @@ -17,7 +17,7 @@ */ package org.apache.sentry.hdfs; -import java.io.IOException; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; public interface SentryHDFSServiceClient { public static final String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService"; http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java index 28b1224..798bbef 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,162 +18,80 @@ package org.apache.sentry.hdfs; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.LinkedList; -import java.util.Map; -import javax.security.auth.callback.CallbackHandler; -import javax.security.sasl.Sasl; - -import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SaslRpcServer.AuthMethod; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.sentry.core.common.exception.MissingConfigurationException; +import org.apache.sentry.core.common.exception.SentryHdfsServiceException; +import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig; +import org.apache.sentry.core.common.transport.SentryServiceClient; +import org.apache.sentry.core.common.transport.SentryTransportFactory; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; -import org.apache.sentry.hdfs.ServiceConstants.ClientConfig; -import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSaslClientTransport; -import org.apache.thrift.transport.TSocket; + import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +/** + * Sentry HDFS Service Client + * <p> + * The public implementation of SentryHDFSServiceClient. + * A Sentry Client in which all the operations are synchronized for thread safety + * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state. + * So it is important to close and re-open the transport so that new socket is used. + */ -public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient { +public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient, SentryServiceClient { private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class); - - /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). - */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - protected UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String authorizationId, - String protocol, String serverName, Map<String, String> props, - CallbackHandler cbh, TTransport transport, boolean wrapUgi) - throws IOException { - super(mechanism, authorizationId, protocol, serverName, props, cbh, - transport); - if (wrapUgi) { - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - // ensure that the ticket is valid before connecting to service. Note that - // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80% - // of ticket lifetime has passed. - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport", e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport", e); - } - } - } - - private void baseOpen() throws TTransportException { - super.open(); - } - } - - private final Configuration conf; - private final InetSocketAddress serverAddress; - private final int connectionTimeout; - private boolean kerberos; - private TTransport transport; - - private String[] serverPrincipalParts; private Client client; - private final SentryHDFSClientTransportConfig transportConfig = new SentryHDFSClientTransportConfig(); - private static final ImmutableMap<String, String> SASL_PROPERTIES = - ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); + private SentryTransportFactory transportFactory; + private TTransport transport; + private Configuration conf; - public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException { + public SentryHDFSServiceClientDefaultImpl(Configuration conf, SentryHDFSClientTransportConfig transportConfig) throws IOException { + transportFactory = new SentryTransportFactory(conf, transportConfig); this.conf = conf; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - try { - this.serverAddress = NetUtils.createSocketAddr( - transportConfig.getSentryServerRpcAddress(conf), - transportConfig.getServerRpcPort(conf)); - this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); - kerberos = transportConfig.isKerberosEnabled(conf); - transport = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = transportConfig.getSentryPrincipal(conf); - // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.info("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = transportConfig.useUserGroupInformation(conf); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), - null, serverPrincipalParts[0], serverPrincipalParts[1], - SASL_PROPERTIES, null, transport, wrapUgi); - } else { - serverPrincipalParts = null; - } + } - transport.open(); - } catch (TTransportException e) { - throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); - } catch (MissingConfigurationException e) { - throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); + /** + * Connect to the sentry server + * + * @throws IOException + */ + @Override + public synchronized void connect() throws IOException { + if (transport != null && transport.isOpen()) { + return; } - LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress); + + transport = transportFactory.getTransport(); TProtocol tProtocol = null; long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE, - ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); - if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT, - ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) { + ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT, + ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) { tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize); } else { tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true); } TMultiplexedProtocol protocol = new TMultiplexedProtocol( - tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME); + tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME); + client = new SentryHDFSService.Client(protocol); LOGGER.info("Successfully created client"); } + @Override public synchronized void notifyHMSUpdate(PathsUpdate update) - throws SentryHdfsServiceException { + throws SentryHdfsServiceException { try { client.handle_hms_notification(update.toThrift()); } catch (Exception e) { @@ -181,8 +99,9 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie } } + @Override public synchronized long getLastSeenHMSPathSeqNum() - throws SentryHdfsServiceException { + throws SentryHdfsServiceException { try { return client.check_hms_seq_num(-1); } catch (Exception e) { @@ -190,8 +109,9 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie } } + @Override public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum) - throws SentryHdfsServiceException { + throws SentryHdfsServiceException { SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>()); try { TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum); @@ -211,9 +131,13 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie return retVal; } - public void close() { - if (transport != null) { - transport.close(); - } + @Override + public synchronized void close() { + transportFactory.close(); + } + + @Override + public void disconnect() { + transportFactory.releaseTransport(); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java index cdf6195..e350103 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,15 +17,29 @@ */ package org.apache.sentry.hdfs; +import java.lang.reflect.Proxy; + import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; +import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig; /** * Client factory to create normal client or proxy with HA invocation handler */ public class SentryHDFSServiceClientFactory { - public static SentryHDFSServiceClient create(Configuration conf) - throws Exception { - return new SentryHDFSServiceClientDefaultImpl(conf); + private final static SentryHDFSClientTransportConfig transportConfig = + new SentryHDFSClientTransportConfig(); + + private SentryHDFSServiceClientFactory() { + // Make constructor private to avoid instantiation } + public static SentryHDFSServiceClient create(Configuration conf) + throws Exception { + return (SentryHDFSServiceClient) Proxy + .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(), + SentryHDFSServiceClientDefaultImpl.class.getInterfaces(), + new RetryClientInvocationHandler(conf, + new SentryHDFSServiceClientDefaultImpl(conf, transportConfig), transportConfig)); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java index 4dc99a2..1ad9a02 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.core.common.utils.ThriftUtil; import org.apache.sentry.service.thrift.ProcessorFactory; import org.apache.thrift.TException; import org.apache.thrift.TMultiplexedProcessor; http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java deleted file mode 100644 index 307d8c3..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.hdfs; - -public class SentryHdfsServiceException extends RuntimeException { - private static final long serialVersionUID = 1511645864949767378L; - - public SentryHdfsServiceException(String message, Throwable cause) { - super(message, cause); - } - - public SentryHdfsServiceException(String message) { - super(message); - } - - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java index 03faed7..97c0e1d 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java @@ -46,8 +46,8 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory; import org.apache.sentry.provider.db.log.util.Constants; import org.apache.sentry.provider.db.service.model.MSentryGMPrivilege; import org.apache.sentry.provider.db.service.model.MSentryRole; -import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants; import org.apache.sentry.provider.db.service.thrift.SentryConfigurationException; +import org.apache.sentry.core.common.utils.PolicyStoreConstants; import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants; http://git-wip-us.apache.org/repos/asf/sentry/blob/0dbe38aa/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java index d320d0f..a0fc2cc 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java @@ -18,7 +18,7 @@ package org.apache.sentry.provider.db.generic.service.thrift; -import org.apache.sentry.provider.db.service.thrift.ThriftUtil; +import org.apache.sentry.core.common.utils.ThriftUtil; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol;
