Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign fd16d74ab -> 268ee50ef
SENTRY-1639: Refactor thrift clients configuration constants (Kalyan Kumar Kalvagadda, Reviewed by: Alex Kolbasov and Vadim Spector) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/268ee50e Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/268ee50e Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/268ee50e Branch: refs/heads/sentry-ha-redesign Commit: 268ee50ef05f3863184f139f0b6ccb867823b604 Parents: fd16d74 Author: Alexander Kolbasov <[email protected]> Authored: Thu Mar 23 11:59:58 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Thu Mar 23 11:59:58 2017 -0700 ---------------------------------------------------------------------- .../MissingConfigurationException.java | 28 ++++ .../SentryClientTransportConfigInterface.java | 96 +++++++++++ .../SentryClientTransportConstants.java | 163 +++++++++++++++++++ .../SentryHDFSClientTransportConfig.java | 83 ++++++++++ .../SentryPolicyClientTransportConfig.java | 83 ++++++++++ .../core/common/utils/SentryConstants.java | 3 + sentry-hdfs/sentry-hdfs-dist/pom.xml | 4 + .../org/apache/sentry/hdfs/MetastorePlugin.java | 2 +- .../SentryHDFSServiceClientDefaultImpl.java | 60 +++---- .../SentryGenericServiceClientDefaultImpl.java | 77 +++++---- .../SentryPolicyServiceClientDefaultImpl.java | 116 ++++++------- .../thrift/RetryClientInvocationHandler.java | 6 +- 12 files changed, 601 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/MissingConfigurationException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/MissingConfigurationException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/MissingConfigurationException.java new file mode 100644 index 0000000..3e5b069 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/MissingConfigurationException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.exception; + +/** + * Signals that a mandatory property is missing from the configuration + */ +public class MissingConfigurationException extends RuntimeException { + + public MissingConfigurationException(String configParam) { + super("Property '" + configParam + "' is missing in configuration"); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java new file mode 100644 index 0000000..6cea596 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.transport; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.MissingConfigurationException; + +/** + * Configuration interface for Sentry Thrift Clients + * <p> + * The purpose of the interface is to abstract the knowledge of specific configuration keys + * and provide an API to extract various thrift-related configuration from a Config object + * This Configuration interface should be implemented for all the sentry clients to get + * the transport configuration. + */ +interface SentryClientTransportConfigInterface { + /** + * @param conf configuration + * @return number of times client retry logic should iterate through all + * the servers before giving up. + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException; + + /** + * @param conf configuration + * @return True, if kerberos should be enabled. + * False, Iff kerberos is enabled. + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException; + + /** + * @param conf configuration + * @return True, if Ugi transport has to be used + * False, If not. + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException; + + /** + * @param conf configuration + * @return principle for the particular sentry service + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + String getSentryPrincipal(Configuration conf) throws MissingConfigurationException; + + /** + * Port in RPC Addresses configured is optional + * @param conf configuration + * @return comma-separated list of available sentry server addresses. + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + String getSentryServerRpcAddress(Configuration conf) throws MissingConfigurationException; + + /** + * Port in RPC Addresses configured is optional. If a port is not provided for a server + * listed in RPC configuration, this configuration is used as a default port. + * @param conf configuration + * @return port where sentry server is listening. + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + int getServerRpcPort(Configuration conf) throws MissingConfigurationException; + + /** + * @param conf configuration + * @return time interval in milli-secs that the client should wait for + * establishment of connection to the server. If the connection + * is not established with-in this interval client should try connecting + * to next configured server + * @throws MissingConfigurationException if property is mandatory and is missing in + * configuration. + */ + int getServerRpcConnTimeoutInMs(Configuration conf) throws MissingConfigurationException; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java new file mode 100644 index 0000000..636de40 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + + +/** + * Defines configuration strings needed for sentry thrift clients to handle the transport level + * operations. + * <p> + * This class is abstracted by <code>SentryClientTransportConfigInterface</code>. + * Clients that needs these configuration string use the implementations of interface + * <code>SentryClientTransportConfigInterface</code>. + */ +class SentryClientTransportConstants { + /** + * max retry num for client rpc + * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} + */ + static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; + static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; + + /** + * full retry num for getting the connection in non-pool model + * In a full retry, it will cycle through all available sentry servers + */ + static final String SENTRY_FULL_RETRY_TOTAL = + "sentry.service.client.connection.full.retry-total"; + static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2; + + static final int RPC_PORT_DEFAULT = 8038; + + /** + * Defines configuration strings needed for sentry thrift policy clients to handle + * the transport level operations. + */ + static class PolicyClientConstants { + //configuration for server port + static final String SERVER_RPC_PORT = "sentry.service.client.server.rpc-port"; + + //configuration for server address. It can be coma seperated list of server addresses. + static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address"; + + + /** + * This configuration parameter is only meant to be used for testing purposes. + */ + static final String SECURITY_MODE = "sentry.service.security.mode"; + + /** + * full retry num for getting the connection in non-pool model + * In a full retry, it will cycle through all available sentry servers + */ + static final String SENTRY_FULL_RETRY_TOTAL = + SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL; + static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = + SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT; + + static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi"; + static final String PRINCIPAL = "sentry.service.server.principal"; + + //configration for the client connection timeout. + static final String SERVER_RPC_CONN_TIMEOUT = + "sentry.service.client.server.rpc-connection-timeout"; + + static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; + + /** + * max retry num for client rpc + * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} + */ + static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; + static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; + + // connection pool configuration + static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled"; + static final boolean SENTRY_POOL_ENABLED_DEFAULT = false; + + // commons-pool configuration for pool size + static final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total"; + static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8; + static final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle"; + static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8; + static final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle"; + static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0; + + // retry num for getting the connection from connection pool + static final String SENTRY_POOL_RETRY_TOTAL = + SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL; + static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = + SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL_DEFAULT; + + } + + /** + * Defines configuration strings needed for sentry HDFS clients to handle the transport level + * operations. + */ + static class HDFSClientConstants { + + //Default server port + static final int SERVER_RPC_PORT_DEFAULT = SentryClientTransportConstants.RPC_PORT_DEFAULT; + + //configuration for server port + static final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port"; + + //configuration for server address. It can be coma seperated list of server addresses. + static final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; + + /** + * This configuration parameter is only meant to be used for testing purposes. + */ + static final String SECURITY_MODE = "sentry.hdfs.service.security.mode"; + + /** + * full retry num for getting the connection in non-pool model + * In a full retry, it will cycle through all available sentry servers + */ + static final String SENTRY_FULL_RETRY_TOTAL = + SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL; + + static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = + SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT; + + static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi"; + + static final String PRINCIPAL = "sentry.hdfs.service.server.principal"; + + static final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; + + static final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD + + //configration for the client connection timeout. + static final String SERVER_RPC_CONN_TIMEOUT = + "sentry.hdfs.service.client.server.rpc-connection-timeout"; + + static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000; + + /** + * max retry num for client rpc + * {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])} + */ + static final String SENTRY_RPC_RETRY_TOTAL = + SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL; + + static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java new file mode 100644 index 0000000..12175f7 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.MissingConfigurationException; +import org.apache.sentry.core.common.utils.SentryConstants; + +import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.HDFSClientConstants.*; + +/** + * Provides configuration values and the configuration string for the HDFS sentry + * client + * <p> + * Curently used by <code>SentryHDFSServiceClient</code>. + */ +public final class SentryHDFSClientTransportConfig + implements SentryClientTransportConfigInterface { + public SentryHDFSClientTransportConfig() { } + + @Override + public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException { + return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim() + .equalsIgnoreCase((SentryConstants.KERBEROS_MODE))); + } + + @Override + public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException { + return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT); + } + + @Override + public boolean useUserGroupInformation(Configuration conf) + throws MissingConfigurationException { + return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); + } + + @Override + public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException { + String principle = conf.get(PRINCIPAL); + if (principle != null && !principle.isEmpty()) { + return principle; + } + throw new MissingConfigurationException(PRINCIPAL); + } + + @Override + public String getSentryServerRpcAddress(Configuration conf) + throws MissingConfigurationException { + String serverAddress = conf.get(SERVER_RPC_ADDRESS); + if (serverAddress != null && !serverAddress.isEmpty()) { + return serverAddress; + } + throw new MissingConfigurationException(SERVER_RPC_ADDRESS); + } + + @Override + public int getServerRpcPort(Configuration conf) throws MissingConfigurationException { + return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT); + } + + @Override + public int getServerRpcConnTimeoutInMs(Configuration conf) + throws MissingConfigurationException { + return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java new file mode 100644 index 0000000..038bca7 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.MissingConfigurationException; +import org.apache.sentry.core.common.utils.SentryConstants; +import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.PolicyClientConstants.*; + +/** + * Provides configuration values and the configuration string for the policy based + * clients. + * <p> + * Curently used by policy based clients <code>SentryPolicyServiceClient</code> and + * <code>SentryGenericServiceClient</code> + */ +public final class SentryPolicyClientTransportConfig + implements SentryClientTransportConfigInterface { + public SentryPolicyClientTransportConfig() { } + + @Override + public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException { + return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim() + .equalsIgnoreCase((SentryConstants.KERBEROS_MODE))); + } + + @Override + public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException { + return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT); + } + + @Override + public boolean useUserGroupInformation(Configuration conf) + throws MissingConfigurationException { + return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); + } + + @Override + public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException { + String principle = conf.get(PRINCIPAL); + if (principle != null && !principle.isEmpty()) { + return principle; + } + throw new MissingConfigurationException(PRINCIPAL); + } + + @Override + public String getSentryServerRpcAddress(Configuration conf) + throws MissingConfigurationException { + String serverAddress = conf.get(SERVER_RPC_ADDRESS); + if (serverAddress != null && !serverAddress.isEmpty()) { + return serverAddress; + } + throw new MissingConfigurationException(SERVER_RPC_ADDRESS); + } + + @Override + public int getServerRpcPort(Configuration conf) throws MissingConfigurationException { + return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT); + } + + @Override + public int getServerRpcConnTimeoutInMs(Configuration conf) + throws MissingConfigurationException { + return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java index 4ed1361..a4e69b9 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java @@ -42,4 +42,7 @@ public class SentryConstants { public static final String ACCESS_ALLOW_URI_PER_DB_POLICYFILE = "sentry.allow.uri.db.policyfile"; public static final String SENTRY_ZK_JAAS_NAME = "Sentry"; + + public static final String KERBEROS_MODE = "kerberos"; + } http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-hdfs/sentry-hdfs-dist/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-dist/pom.xml b/sentry-hdfs/sentry-hdfs-dist/pom.xml index beda202..e828d5e 100644 --- a/sentry-hdfs/sentry-hdfs-dist/pom.xml +++ b/sentry-hdfs/sentry-hdfs-dist/pom.xml @@ -45,6 +45,10 @@ limitations under the License. <groupId>org.apache.sentry</groupId> <artifactId>sentry-hdfs-namenode-plugin</artifactId> </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-core-common</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java index 085971b..16ffa1b 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java @@ -308,7 +308,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); } catch (Exception e) { sentryClient = null; - LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e); + LOGGER.error("Could not connect to Sentry HDFS Service !!", e); } } return sentryClient; http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java index 03bf39e..28b1224 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java @@ -24,19 +24,23 @@ import java.util.LinkedList; import java.util.Map; import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.exception.MissingConfigurationException; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; import org.apache.sentry.hdfs.ServiceConstants.ClientConfig; +import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; @@ -114,43 +118,43 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie private String[] serverPrincipalParts; private Client client; + private final SentryHDFSClientTransportConfig transportConfig = new SentryHDFSClientTransportConfig(); + private static final ImmutableMap<String, String> SASL_PROPERTIES = + ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key " - + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt( - ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT)); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - kerberos = ClientConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ClientConfig.SECURITY_MODE, ClientConfig.SECURITY_MODE_KERBEROS).trim()); - transport = new TSocket(serverAddress.getHostName(), + try { + this.serverAddress = NetUtils.createSocketAddr( + transportConfig.getSentryServerRpcAddress(conf), + transportConfig.getServerRpcPort(conf)); + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + kerberos = transportConfig.isKerberosEnabled(conf); + transport = new TSocket(serverAddress.getHostName(), serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull( - conf.get(ClientConfig.PRINCIPAL), ClientConfig.PRINCIPAL + " is required"); - - // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.info("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ClientConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), + if (kerberos) { + String serverPrincipal = transportConfig.getSentryPrincipal(conf); + // Resolve server host in the same way as we are doing on server side + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + LOGGER.info("Using server kerberos principal: " + serverPrincipal); + + serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, + "Kerberos principal should have 3 parts: " + serverPrincipal); + boolean wrapUgi = transportConfig.useUserGroupInformation(conf); + transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), null, serverPrincipalParts[0], serverPrincipalParts[1], - ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); - } else { - serverPrincipalParts = null; - } - try { + SASL_PROPERTIES, null, transport, wrapUgi); + } else { + serverPrincipalParts = null; + } + transport.open(); } catch (TTransportException e) { throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); + } catch (MissingConfigurationException e) { + throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); } LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress); TProtocol tProtocol = null; http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java index ee6cdf7..075983e 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java @@ -23,7 +23,9 @@ import java.security.PrivilegedExceptionAction; import java.util.*; import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import org.apache.hadoop.net.NetUtils; @@ -31,13 +33,14 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.exception.MissingConfigurationException; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; +import org.apache.sentry.core.common.utils.SentryConstants; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.service.thrift.ServiceConstants; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.Status; import org.apache.sentry.service.thrift.sentry_common_serviceConstants; import org.apache.thrift.TException; @@ -64,6 +67,10 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi private static final Logger LOGGER = LoggerFactory .getLogger(SentryGenericServiceClientDefaultImpl.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured "; + private final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig(); + + private static final ImmutableMap<String, String> SASL_PROPERTIES = + ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); /** * This transport wraps the Sasl transports to set up the right UGI context for open(). @@ -116,46 +123,48 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi } } - public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException { + public SentryGenericServiceClientDefaultImpl(Configuration conf) throws Exception { // copy the configuration because we may make modifications to it. this.conf = new Configuration(conf); - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key " - + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt( - ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT)); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); - transport = new TSocket(serverAddress.getHostName(), + + Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); + try { + this.serverAddress = NetUtils.createSocketAddr( + transportConfig.getSentryServerRpcAddress(conf), + transportConfig.getServerRpcPort(conf)); + + + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + kerberos = transportConfig.isKerberosEnabled(conf); + transport = new TSocket(serverAddress.getHostName(), serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required"); - // since the client uses hadoop-auth, we need to set kerberos in - // hadoop-auth if we plan to use kerberos - conf.set(HADOOP_SECURITY_AUTHENTICATION, ServerConfig.SECURITY_MODE_KERBEROS); - - // Resolve server host in the same way as we are doing on server side - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), + if (kerberos) { + String serverPrincipal = transportConfig.getSentryPrincipal(conf); + // since the client uses hadoop-auth, we need to set kerberos in + // hadoop-auth if we plan to use kerberos + conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE); + + // Resolve server host in the same way as we are doing on server side + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + LOGGER.debug("Using server kerberos principal: " + serverPrincipal); + + serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, + "Kerberos principal should have 3 parts: " + serverPrincipal); + boolean wrapUgi = transportConfig.useUserGroupInformation(conf); + transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(), null, serverPrincipalParts[0], serverPrincipalParts[1], - ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi, conf); - } else { - serverPrincipalParts = null; - } - try { + SASL_PROPERTIES, null, transport, wrapUgi, conf); + } else { + serverPrincipalParts = null; + } transport.open(); } catch (TTransportException e) { throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); + } catch (MissingConfigurationException e) { + throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); } + LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index 2cf748e..4284b53 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -28,7 +28,9 @@ import java.util.Map; import java.util.Set; import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -36,6 +38,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.common.exception.MissingConfigurationException; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; @@ -44,9 +47,7 @@ import org.apache.sentry.core.model.db.DBModelAuthorizable; import org.apache.sentry.core.common.utils.PolicyFileConstants; import org.apache.sentry.service.thrift.SentryServiceUtil; import org.apache.sentry.service.thrift.ServiceConstants; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants; import org.apache.sentry.service.thrift.Status; import org.apache.thrift.TException; @@ -56,6 +57,7 @@ import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +88,9 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService // configs for connection retry private int connectionFullRetryTotal; private List<InetSocketAddress> endpoints; + final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig(); + private static final ImmutableMap<String, String> SASL_PROPERTIES = + ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); /** * This transport wraps the Sasl transports to set up the right UGI context for open(). @@ -139,47 +144,45 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * Initialize the sentry configurations. */ public SentryPolicyServiceClientDefaultImpl(Configuration conf) - throws IOException { + throws IOException { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.connectionTimeout = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ServiceConstants.ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.connectionFullRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL, - ServiceConstants.ClientConfig.SENTRY_FULL_RETRY_TOTAL_DEFAULT); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); - - String hostsAndPortsStr = conf.get(ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS); - if (hostsAndPortsStr == null) { - throw new RuntimeException("Config key " + - ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS + " is required"); - } - int defaultPort = conf.getInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, - ServiceConstants.ClientConfig.SERVER_RPC_PORT_DEFAULT); - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); - this.endpoints = new ArrayList(hostsAndPortsStrArr.length); - for (int i = hostsAndPortsStrArr.length - 1; i >= 0 ; i--) { - this.endpoints.add( - new InetSocketAddress(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort())); - LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString()); + try { + String hostsAndPortsStr; + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); + + this.kerberos = transportConfig.isKerberosEnabled(conf); + hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); + + int serverPort = transportConfig.getServerRpcPort(conf); + + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort); + this.endpoints = new ArrayList(hostsAndPortsStrArr.length); + for (int i = hostsAndPortsStrArr.length - 1; i >= 0; i--) { + this.endpoints.add( + new InetSocketAddress(hostsAndPorts[i].getHostText(), hostsAndPorts[i].getPort())); + LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString()); + } + } catch (MissingConfigurationException e) { + throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); } } public SentryPolicyServiceClientDefaultImpl(String addr, int port, - Configuration conf) throws IOException { + Configuration conf) throws IOException { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - InetSocketAddress serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull( - addr, "Config key " + ClientConfig.SERVER_RPC_ADDRESS - + " is required"), port); - this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT, - ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT); - this.kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim()); - connect(serverAddress); + try { + InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port); + this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + this.kerberos = transportConfig.isKerberosEnabled(conf); + connect(serverAddress); + } catch (MissingConfigurationException e) { + throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e); + } + } /** @@ -193,12 +196,14 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server. * <p> * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. + * @throws Exception if client fails to connect to all servers for a configured + * number of times */ - public synchronized void connectWithRetry() throws IOException { + public synchronized void connectWithRetry() throws Exception { if (isConnected()) { return; } - IOException currentException = null; + Exception currentException = null; // Here for each full connectWithRetry it will cycle through all available sentry // servers. Before each full connectWithRetry, it will shuffle the server list. for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { @@ -210,7 +215,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService connect(addr); LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString())); return; - } catch (IOException e) { + } catch (Exception e) { LOGGER.debug(String.format("Failed connection to %s: %s", addr.toString(), e.getMessage()), e); currentException = e; @@ -226,38 +231,37 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } /** - * Connect to the specified socket address and throw IOException if failed. + * Connect to the specified socket address and throw Exception if failed. */ private void connect(InetSocketAddress serverAddress) throws IOException { transport = new TSocket(serverAddress.getHostName(), serverAddress.getPort(), connectionTimeout); - if (kerberos) { - String serverPrincipal = Preconditions.checkNotNull( - conf.get(ServiceConstants.ServerConfig.PRINCIPAL), - ServiceConstants.ServerConfig.PRINCIPAL + " is required"); + try { + if (kerberos) { + String serverPrincipal = transportConfig.getSentryPrincipal(conf); - // Resolve server host in the same way as we are doing on server side - serverPrincipal = + // Resolve server host in the same way as we are doing on server side + serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); + LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, + serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); + Preconditions.checkArgument(serverPrincipalParts.length == 3, "Kerberos principal should have 3 parts: " + serverPrincipal); - boolean wrapUgi = "true".equalsIgnoreCase(conf - .get(ServiceConstants.ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true")); - transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport( + boolean wrapUgi = transportConfig.useUserGroupInformation(conf); + transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport( SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), null, serverPrincipalParts[0], serverPrincipalParts[1], - ServiceConstants.ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi); - } else { - serverPrincipalParts = null; - } - try { + SASL_PROPERTIES, null, transport, wrapUgi); + } else { + serverPrincipalParts = null; + } + transport.open(); } catch (TTransportException e) { throw new IOException("Transport exception while opening transport: " + e.getMessage(), e); } + LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress); long maxMessageSize = conf.getLong( ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, http://git-wip-us.apache.org/repos/asf/sentry/blob/268ee50e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java index c4964c3..2f38198 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java @@ -60,7 +60,7 @@ class RetryClientInvocationHandler extends SentryClientInvocationHandler{ * Initialize the sentry configurations, including rpc retry count and client connection * configs for SentryPolicyServiceClientDefaultImpl */ - RetryClientInvocationHandler(Configuration conf) throws IOException { + RetryClientInvocationHandler(Configuration conf) throws Exception { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); this.rpcRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL, @@ -75,6 +75,7 @@ class RetryClientInvocationHandler extends SentryClientInvocationHandler{ * connection problem, it will close the current connection, and retry (reconnect and * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException * if failed retry after rpcRetryTotal times. + * if it is failed with other exception, method would just re-throw the exception. * Synchronized it for thread safety. */ @Override @@ -93,6 +94,9 @@ class RetryClientInvocationHandler extends SentryClientInvocationHandler{ lastExc = e; close(); continue; + } catch (Exception e) { + close(); + throw e; } // do the thrift call
