Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 2f70aac33 -> 5dc6b2855
http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java index d1a4d99..b5b8f82 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java @@ -18,78 +18,62 @@ package org.apache.sentry.provider.db.service.thrift; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Collections; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; - -import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; -import org.apache.sentry.core.common.transport.SentryTransportFactory; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.core.common.transport.SentryConnection; +import org.apache.sentry.core.common.transport.SentryTransportPool; +import org.apache.sentry.core.common.transport.TTransportWrapper; +import org.apache.sentry.core.common.utils.PolicyFileConstants; import org.apache.sentry.core.model.db.AccessConstants; import org.apache.sentry.core.model.db.DBModelAuthorizable; -import org.apache.sentry.core.common.utils.PolicyFileConstants; +import org.apache.sentry.provider.db.service.thrift.SentryPolicyService.Client; import org.apache.sentry.service.thrift.SentryServiceUtil; -import org.apache.sentry.service.thrift.ServiceConstants; +import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope; import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants; import org.apache.sentry.service.thrift.Status; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; -import org.apache.sentry.core.common.transport.SentryServiceClient; -import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; -import org.apache.thrift.transport.TTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; /** - * Sentry Policy Service Client + * Client implementation for Policy (HMS) clients. * <p> - * The public implementation of SentryPolicyServiceClient. - * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state - * So it is important to close and re-open the transportFactory so that new socket is used. - * When an class is instantiated, there will be transportFactory created connecting with first available - * server this is configured. + * The class is not thread-safe - it is up to the callers to ensure thread safety */ -public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryServiceClient { +public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient, SentryConnection { - private SentryPolicyService.Client client; - private SentryTransportFactory transportFactory; - private TTransport transport; - private Configuration conf; + private Client client; + private final SentryTransportPool transportPool; + private TTransportWrapper transport; + private final long maxMessageSize; - private static final Logger LOGGER = LoggerFactory - .getLogger(SentryPolicyServiceClient.class); private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred "; /** * Initialize the sentry configurations. */ - public SentryPolicyServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig) + public SentryPolicyServiceClientDefaultImpl(Configuration conf, + SentryTransportPool transportPool) throws IOException { - transportFactory = new SentryTransportFactory(conf, transportConfig); - this.conf = conf; - } - - public SentryPolicyServiceClientDefaultImpl(String addr, int port, - Configuration conf) throws IOException { - transportFactory = new SentryTransportFactory(addr, port, conf, - new SentryPolicyClientTransportConfig()); - this.conf = conf; - connect(); + maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, + ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + this.transportPool = transportPool; } /** @@ -98,24 +82,21 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * @throws IOException */ @Override - public synchronized void connect() throws IOException { - if (transport != null && transport.isOpen()) { + public void connect() throws Exception { + if ((transport != null) && transport.isOpen()) { return; } - transport = transportFactory.getTransport(); - long maxMessageSize = conf.getLong( - ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, - ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + transport = transportPool.getTransport(); TMultiplexedProtocol protocol = new TMultiplexedProtocol( - new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true), - SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME); - client = new SentryPolicyService.Client(protocol); - LOGGER.debug("Successfully created client"); + new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize, + true, true), + SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME); + client = new Client(protocol); } @Override - public synchronized void createRole(String requestorUserName, String roleName) + public void createRole(String requestorUserName, String roleName) throws SentryUserException { TCreateSentryRoleRequest request = new TCreateSentryRoleRequest(); request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT); @@ -130,20 +111,20 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void dropRole(String requestorUserName, + public void dropRole(String requestorUserName, String roleName) throws SentryUserException { dropRole(requestorUserName, roleName, false); } @Override - public synchronized void dropRoleIfExists(String requestorUserName, + public void dropRoleIfExists(String requestorUserName, String roleName) throws SentryUserException { dropRole(requestorUserName, roleName, true); } - private synchronized void dropRole(String requestorUserName, + private void dropRole(String requestorUserName, String roleName, boolean ifExists) throws SentryUserException { TDropSentryRoleRequest request = new TDropSentryRoleRequest(); @@ -171,7 +152,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * @throws SentryUserException */ @Override - public synchronized Set<TSentryRole> listRolesByGroupName( + public Set<TSentryRole> listRolesByGroupName( String requestorUserName, String groupName) throws SentryUserException { @@ -219,7 +200,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName, + public Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName, String roleName) throws SentryUserException { return listPrivilegesByRoleName(requestorUserName, roleName, null); @@ -235,7 +216,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * @throws SentryUserException */ @Override - public synchronized Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName, + public Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName, String roleName, List<? extends Authorizable> authorizable) throws SentryUserException { TListSentryPrivilegesRequest request = new TListSentryPrivilegesRequest(); @@ -257,13 +238,13 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<TSentryRole> listRoles(String requestorUserName) + public Set<TSentryRole> listRoles(String requestorUserName) throws SentryUserException { return listRolesByGroupName(requestorUserName, null); } @Override - public synchronized Set<TSentryRole> listUserRoles(String requestorUserName) + public Set<TSentryRole> listUserRoles(String requestorUserName) throws SentryUserException { Set<TSentryRole> tSentryRoles = Sets.newHashSet(); tSentryRoles.addAll(listRolesByGroupName(requestorUserName, AccessConstants.ALL)); @@ -272,7 +253,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName, + public TSentryPrivilege grantURIPrivilege(String requestorUserName, String roleName, String server, String uri) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, @@ -280,7 +261,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName, + public TSentryPrivilege grantURIPrivilege(String requestorUserName, String roleName, String server, String uri, Boolean grantOption) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, @@ -288,7 +269,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void grantServerPrivilege(String requestorUserName, + public void grantServerPrivilege(String requestorUserName, String roleName, String server, String action) throws SentryUserException { @@ -307,14 +288,14 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * Should use grantServerPrivilege(String requestorUserName, * String roleName, String server, String action, Boolean grantOption) */ - public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName, + public TSentryPrivilege grantServerPrivilege(String requestorUserName, String roleName, String server, Boolean grantOption) throws SentryUserException { return grantServerPrivilege(requestorUserName, roleName, server, AccessConstants.ALL, grantOption); } @Override - public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName, + public TSentryPrivilege grantServerPrivilege(String requestorUserName, String roleName, String server, String action, Boolean grantOption) throws SentryUserException { @@ -329,7 +310,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName, + public TSentryPrivilege grantDatabasePrivilege(String requestorUserName, String roleName, String server, String db, String action) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, @@ -337,7 +318,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName, + public TSentryPrivilege grantDatabasePrivilege(String requestorUserName, String roleName, String server, String db, String action, Boolean grantOption) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, @@ -345,7 +326,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName, + public TSentryPrivilege grantTablePrivilege(String requestorUserName, String roleName, String server, String db, String table, String action) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server, @@ -354,7 +335,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName, + public TSentryPrivilege grantTablePrivilege(String requestorUserName, String roleName, String server, String db, String table, String action, Boolean grantOption) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server, @@ -362,7 +343,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName, + public TSentryPrivilege grantColumnPrivilege(String requestorUserName, String roleName, String server, String db, String table, String columnName, String action) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server, @@ -371,7 +352,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName, + public TSentryPrivilege grantColumnPrivilege(String requestorUserName, String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption) throws SentryUserException { return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server, @@ -379,7 +360,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName, + public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName, String roleName, String server, String db, String table, List<String> columnNames, String action) throws SentryUserException { return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN, server, @@ -388,7 +369,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName, + public Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName, String roleName, String server, String db, String table, List<String> columnNames, String action, Boolean grantOption) throws SentryUserException { return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN, @@ -397,14 +378,14 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<TSentryPrivilege> grantPrivileges( + public Set<TSentryPrivilege> grantPrivileges( String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException { return grantPrivilegesCore(requestorUserName, roleName, privileges); } @Override - public synchronized TSentryPrivilege grantPrivilege(String requestorUserName, String roleName, + public TSentryPrivilege grantPrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException { return grantPrivilegeCore(requestorUserName, roleName, privilege); } @@ -500,12 +481,12 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException { + public void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException { this.revokePrivilegesCore(requestorUserName, roleName, privileges); } @Override - public synchronized void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException { + public void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException { this.revokePrivilegeCore(requestorUserName, roleName, privilege); } @@ -530,7 +511,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeURIPrivilege(String requestorUserName, + public void revokeURIPrivilege(String requestorUserName, String roleName, String server, String uri) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -538,7 +519,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeURIPrivilege(String requestorUserName, + public void revokeURIPrivilege(String requestorUserName, String roleName, String server, String uri, Boolean grantOption) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -546,7 +527,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeServerPrivilege(String requestorUserName, + public void revokeServerPrivilege(String requestorUserName, String roleName, String server, String action) throws SentryUserException { @@ -560,7 +541,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService PrivilegeScope.SERVER, server, null, null, null, null, action); } - public synchronized void revokeServerPrivilege(String requestorUserName, + public void revokeServerPrivilege(String requestorUserName, String roleName, String server, String action, Boolean grantOption) throws SentryUserException { @@ -580,7 +561,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * String roleName, String server, String action, Boolean grantOption) */ @Override - public synchronized void revokeServerPrivilege(String requestorUserName, + public void revokeServerPrivilege(String requestorUserName, String roleName, String server, boolean grantOption) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -588,7 +569,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeDatabasePrivilege(String requestorUserName, + public void revokeDatabasePrivilege(String requestorUserName, String roleName, String server, String db, String action) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -596,7 +577,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeDatabasePrivilege(String requestorUserName, + public void revokeDatabasePrivilege(String requestorUserName, String roleName, String server, String db, String action, Boolean grantOption) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -604,7 +585,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeTablePrivilege(String requestorUserName, + public void revokeTablePrivilege(String requestorUserName, String roleName, String server, String db, String table, String action) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -613,7 +594,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeTablePrivilege(String requestorUserName, + public void revokeTablePrivilege(String requestorUserName, String roleName, String server, String db, String table, String action, Boolean grantOption) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -622,7 +603,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName, + public void revokeColumnPrivilege(String requestorUserName, String roleName, String server, String db, String table, String columnName, String action) throws SentryUserException { ImmutableList.Builder<String> listBuilder = ImmutableList.builder(); @@ -633,7 +614,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName, + public void revokeColumnPrivilege(String requestorUserName, String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption) throws SentryUserException { ImmutableList.Builder<String> listBuilder = ImmutableList.builder(); @@ -644,7 +625,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName, + public void revokeColumnsPrivilege(String requestorUserName, String roleName, String server, String db, String table, List<String> columns, String action) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -653,7 +634,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName, + public void revokeColumnsPrivilege(String requestorUserName, String roleName, String server, String db, String table, List<String> columns, String action, Boolean grantOption) throws SentryUserException { revokePrivilege(requestorUserName, roleName, @@ -741,7 +722,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Set<String> listPrivilegesForProvider + public Set<String> listPrivilegesForProvider (Set<String> groups, Set<String> users, ActiveRoleSet roleSet, Authorizable... authorizable) throws SentryUserException { TSentryActiveRoleSet thriftRoleSet = new TSentryActiveRoleSet(roleSet.isAll(), roleSet.getRoles()); @@ -766,21 +747,21 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void grantRoleToGroup(String requestorUserName, + public void grantRoleToGroup(String requestorUserName, String groupName, String roleName) throws SentryUserException { grantRoleToGroups(requestorUserName, roleName, Sets.newHashSet(groupName)); } @Override - public synchronized void revokeRoleFromGroup(String requestorUserName, + public void revokeRoleFromGroup(String requestorUserName, String groupName, String roleName) throws SentryUserException { revokeRoleFromGroups(requestorUserName, roleName, Sets.newHashSet(groupName)); } @Override - public synchronized void grantRoleToGroups(String requestorUserName, + public void grantRoleToGroups(String requestorUserName, String roleName, Set<String> groups) throws SentryUserException { TAlterSentryRoleAddGroupsRequest request = new TAlterSentryRoleAddGroupsRequest( @@ -795,7 +776,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeRoleFromGroups(String requestorUserName, + public void revokeRoleFromGroups(String requestorUserName, String roleName, Set<String> groups) throws SentryUserException { TAlterSentryRoleDeleteGroupsRequest request = new TAlterSentryRoleDeleteGroupsRequest( @@ -810,19 +791,19 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void grantRoleToUser(String requestorUserName, String userName, + public void grantRoleToUser(String requestorUserName, String userName, String roleName) throws SentryUserException { grantRoleToUsers(requestorUserName, roleName, Sets.newHashSet(userName)); } @Override - public synchronized void revokeRoleFromUser(String requestorUserName, String userName, + public void revokeRoleFromUser(String requestorUserName, String userName, String roleName) throws SentryUserException { revokeRoleFromUsers(requestorUserName, roleName, Sets.newHashSet(userName)); } @Override - public synchronized void grantRoleToUsers(String requestorUserName, String roleName, + public void grantRoleToUsers(String requestorUserName, String roleName, Set<String> users) throws SentryUserException { TAlterSentryRoleAddUsersRequest request = new TAlterSentryRoleAddUsersRequest( ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users); @@ -835,7 +816,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void revokeRoleFromUsers(String requestorUserName, String roleName, + public void revokeRoleFromUsers(String requestorUserName, String roleName, Set<String> users) throws SentryUserException { TAlterSentryRoleDeleteUsersRequest request = new TAlterSentryRoleDeleteUsersRequest( ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users); @@ -858,7 +839,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void dropPrivileges(String requestorUserName, + public void dropPrivileges(String requestorUserName, List<? extends Authorizable> authorizableObjects) throws SentryUserException { TSentryAuthorizable tSentryAuthorizable = setupSentryAuthorizable(authorizableObjects); @@ -875,7 +856,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void renamePrivileges(String requestorUserName, + public void renamePrivileges(String requestorUserName, List<? extends Authorizable> oldAuthorizables, List<? extends Authorizable> newAuthorizables) throws SentryUserException { TSentryAuthorizable tOldSentryAuthorizable = setupSentryAuthorizable(oldAuthorizables); @@ -894,7 +875,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable + public Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable ( String requestorUserName, Set<List<? extends Authorizable>> authorizables, Set<String> groups, @@ -937,7 +918,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService */ @Override - public synchronized String getConfigValue(String propertyName, String defaultValue) + public String getConfigValue(String propertyName, String defaultValue) throws SentryUserException { TSentryConfigValueRequest request = new TSentryConfigValueRequest( ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, propertyName); @@ -977,7 +958,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService * @param requestorUserName The name of the request user */ @Override - public synchronized void importPolicy + public void importPolicy (Map<String, Map<String, Set<String>>> policyFileMappingData, String requestorUserName, boolean isOverwriteRole) throws SentryUserException { @@ -1022,7 +1003,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService // export the sentry mapping data with map structure @Override - public synchronized Map<String, Map<String, Set<String>>> exportPolicy(String + public Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath) throws SentryUserException { TSentryExportMappingDataRequest request = new TSentryExportMappingDataRequest( @@ -1065,12 +1046,23 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService } @Override - public synchronized void close() { - transportFactory.close(); + public void close() { + done(); + } + + @Override + public void done() { + if (transport != null) { + transportPool.returnTransport(transport); + transport = null; + } } @Override - public void disconnect() { - transportFactory.releaseTransport(); + public void invalidate() { + if (transport != null) { + transportPool.invalidateTransport(transport); + transport = null; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java index 1d09846..09f17ed 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java @@ -40,36 +40,39 @@ public class SentryShellHive extends SentryShellCommon { public void run() throws Exception { Command command = null; - SentryPolicyServiceClient client = SentryServiceClientFactory.create(getSentryConf()); - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - String requestorName = ugi.getShortUserName(); - if (isCreateRole) { - command = new CreateRoleCmd(roleName); - } else if (isDropRole) { - command = new DropRoleCmd(roleName); - } else if (isAddRoleGroup) { - command = new GrantRoleToGroupsCmd(roleName, groupName); - } else if (isDeleteRoleGroup) { - command = new RevokeRoleFromGroupsCmd(roleName, groupName); - } else if (isGrantPrivilegeRole) { - command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr); - } else if (isRevokePrivilegeRole) { - command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr); - } else if (isListRole) { - command = new ListRolesCmd(groupName); - } else if (isListPrivilege) { - command = new ListPrivilegesCmd(roleName); - } + try(SentryPolicyServiceClient client = + SentryServiceClientFactory.create(getSentryConf())) { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + String requestorName = ugi.getShortUserName(); - // check the requestor name - if (StringUtils.isEmpty(requestorName)) { - // The exception message will be recoreded in log file. - throw new Exception("The requestor name is empty."); - } + if (isCreateRole) { + command = new CreateRoleCmd(roleName); + } else if (isDropRole) { + command = new DropRoleCmd(roleName); + } else if (isAddRoleGroup) { + command = new GrantRoleToGroupsCmd(roleName, groupName); + } else if (isDeleteRoleGroup) { + command = new RevokeRoleFromGroupsCmd(roleName, groupName); + } else if (isGrantPrivilegeRole) { + command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr); + } else if (isRevokePrivilegeRole) { + command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr); + } else if (isListRole) { + command = new ListRolesCmd(groupName); + } else if (isListPrivilege) { + command = new ListPrivilegesCmd(roleName); + } - if (command != null) { - command.execute(client, requestorName); + // check the requestor name + if (StringUtils.isEmpty(requestorName)) { + // The exception message will be recoreded in log file. + throw new Exception("The requestor name is empty."); + } + + if (command != null) { + command.execute(client, requestorName); + } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java deleted file mode 100644 index acf9b05..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -import com.google.common.net.HostAndPort; -import org.apache.commons.pool2.PooledObjectFactory; -import org.apache.commons.pool2.impl.AbandonedConfig; -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryUserException; -import org.apache.sentry.core.common.transport.SentryClientInvocationHandler; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.core.common.utils.ThriftUtil; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The PoolClientInvocationHandler is a proxy class for handling thrift - * call. For every thrift call, get the instance of - * SentryPolicyServiceBaseClient from the commons-pool, and return the instance - * to the commons-pool after complete the call. For any exception with the call, - * discard the instance and create a new one added to the commons-pool. Then, - * get the instance and do the call again. For the thread safe, the commons-pool - * will manage the connection pool, and every thread can get the connection by - * borrowObject() and return the connection to the pool by returnObject(). - * - * TODO: Current pool model does not manage the opening connections very well, - * e.g. opening connections with failed servers should be closed promptly. - */ - -public class PoolClientInvocationHandler extends SentryClientInvocationHandler { - private static final Logger LOGGER = - LoggerFactory.getLogger(PoolClientInvocationHandler.class); - - private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred "; - - private final Configuration conf; - - /** - * The configuration to use for our object pools. - * Null if we are not using object pools. - */ - private final GenericObjectPoolConfig poolConfig; - - /** - * The total number of connection retries to attempt per endpoint. - */ - private final int connectionRetryTotal; - - /** - * The configured sentry servers. - */ - private final Endpoint[] endpoints; - - /** - * The endpoint which we are currently using. This can be read without any locks. - * It must be written while holding the endpoints lock. - */ - private volatile int freshestEndpointIdx = 0; - - private class Endpoint { - /** - * The server address or hostname. - */ - private final String addr; - - /** - * The server port. - */ - private final int port; - - /** - * The server's poolFactory used to create new clients. - */ - private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory; - - /** - * The server's pool of cached clients. - */ - private final GenericObjectPool<SentryPolicyServiceClient> pool; - - Endpoint(String addr, int port) { - this.addr = addr; - this.port = port; - this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf); - this.pool = new GenericObjectPool<SentryPolicyServiceClient>( - this.poolFactory, poolConfig, new AbandonedConfig()); - } - - GenericObjectPool<SentryPolicyServiceClient> getPool() { - return pool; - } - - String getEndPointStr() { - return new String("endpoint at [address " + addr + ", port " + port + "]"); - } - } - - public PoolClientInvocationHandler(Configuration conf) throws Exception { - this.conf = conf; - - this.poolConfig = new GenericObjectPoolConfig(); - // config the pool size for commons-pool - this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, - ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT)); - this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, - ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT)); - this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, - ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT)); - - // get the retry number for reconnecting service - this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL, - ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT); - - String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS); - if (hostsAndPortsStr == null) { - throw new RuntimeException("Config key " + - ClientConfig.SERVER_RPC_ADDRESS + " is required"); - } - int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT, - ClientConfig.SERVER_RPC_PORT_DEFAULT); - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); - this.endpoints = new Endpoint[hostsAndPorts.length]; - for (int i = 0; i < this.endpoints.length; i++) { - this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort()); - LOGGER.info("Initiate sentry sever endpoint: hostname " + - hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort()); - } - } - - @Override - public Object invokeImpl(Object proxy, Method method, Object[] args) - throws Exception { - int retryCount = 0; - /** - * The maximum number of retries that we will do. Each endpoint gets its - * own set of retries. - */ - int retryLimit = connectionRetryTotal * endpoints.length; - - /** - * The index of the endpoint to use. - */ - int endpointIdx = freshestEndpointIdx; - - /** - * A list of exceptions from each endpoint. This starts as null to avoid - * memory allocation in the common case where there is no error. - */ - Exception exc[] = null; - - Object ret = null; - - while (retryCount < retryLimit) { - GenericObjectPool<SentryPolicyServiceClient> pool = - endpoints[endpointIdx].getPool(); - try { - if ((exc != null) && - (exc[endpointIdx] instanceof TTransportException)) { - // If there was a TTransportException last time we tried to contact - // this endpoint, attempt to create a new connection before we try - // again. - synchronized (endpoints) { - // If there has room, create new instance and add it to the - // commons-pool. This instance will be returned first from the - // commons-pool, because the configuration is LIFO. - if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) { - pool.addObject(); - } - } - } - // Try to make the RPC. - ret = invokeFromPool(method, args, pool); - break; - } catch (TTransportException e) { - if (exc == null) { - exc = new Exception[endpoints.length]; - } - exc[endpointIdx] = e; - } - - Exception lastExc = exc[endpointIdx]; - synchronized (endpoints) { - int curFreshestEndpointIdx = freshestEndpointIdx; - if (curFreshestEndpointIdx == endpointIdx) { - curFreshestEndpointIdx = - (curFreshestEndpointIdx + 1) % endpoints.length; - freshestEndpointIdx = curFreshestEndpointIdx; - } - endpointIdx = curFreshestEndpointIdx; - } - // Increase the retry num, and throw the exception if can't retry again. - retryCount++; - if (retryCount == connectionRetryTotal) { - for (int i = 0; i < exc.length; i++) { - // Since freshestEndpointIdx is shared by multiple threads, it is possible that - // the ith endpoint has been tried in another thread and skipped in the current - // thread. - if (exc[i] != null) { - LOGGER.error("Sentry server " + endpoints[i].getEndPointStr() - + " is in unreachable."); - } - } - throw new SentryUserException("Sentry servers are unreachable. " + - "Diagnostics is needed for unreachable servers.", lastExc); - } - } - return ret; - } - - private Object invokeFromPool(Method method, Object[] args, - GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception { - Object result = null; - SentryPolicyServiceClient client; - try { - // get the connection from the pool, don't know if the connection is broken. - client = pool.borrowObject(); - } catch (Exception e) { - LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); - // If the exception is caused by connection problem, throw the TTransportException - // for reconnect. - if (e instanceof IOException) { - throw new TTransportException(e); - } - throw new SentryUserException(e.getMessage(), e); - } - try { - // do the thrift call - result = method.invoke(client, args); - } catch (InvocationTargetException e) { - // Get the target exception, check if SentryUserException or TTransportException is wrapped. - // TTransportException or IOException means there has connection problem with the pool. - Throwable targetException = e.getCause(); - if (targetException instanceof SentryUserException) { - Throwable sentryTargetException = targetException.getCause(); - // If there has connection problem, eg, invalid connection if the service restarted, - // sentryTargetException instanceof TTransportException or IOException = true. - if (sentryTargetException instanceof TTransportException - || sentryTargetException instanceof IOException) { - // If the exception is caused by connection problem, destroy the instance and - // remove it from the commons-pool. Throw the TTransportException for reconnect. - pool.invalidateObject(client); - throw new TTransportException(sentryTargetException); - } - // The exception is thrown by thrift call, eg, SentryAccessDeniedException. - throw (SentryUserException) targetException; - } - throw e; - } finally{ - try { - // return the instance to commons-pool - pool.returnObject(client); - } catch (Exception e) { - LOGGER.error(POOL_EXCEPTION_MESSAGE, e); - throw e; - } - } - return result; - } - - @Override - public void close() { - for (int i = 0; i < endpoints.length; i++) { - try { - endpoints[i].getPool().close(); - } catch (Exception e) { - LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 9beb07b..ec938da 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -313,7 +313,8 @@ public class SentryService implements Callable, SigUtils.SigListener { hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower, initDelay, period, TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { - LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", period), e); + LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", + period), e); throw e; } } @@ -381,7 +382,7 @@ public class SentryService implements Callable, SigUtils.SigListener { sentryStoreCleanService.scheduleWithFixedDelay( storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS); - LOGGER.info("sentry store cleaner is scheduled with interval %d seconds", storeCleanPeriodSecs); + LOGGER.info("sentry store cleaner is scheduled with interval {} seconds", storeCleanPeriodSecs); } catch(IllegalArgumentException e){ LOGGER.error("Could not start SentryStoreCleaner due to illegal argument", e); http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index 7db9310..1324bd9 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -18,37 +18,94 @@ package org.apache.sentry.service.thrift; -import java.lang.reflect.Proxy; - import org.apache.hadoop.conf.Configuration; - import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; +import org.apache.sentry.core.common.transport.SentryTransportFactory; +import org.apache.sentry.core.common.transport.SentryTransportPool; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.lang.reflect.Proxy; +import java.util.concurrent.atomic.AtomicReference; +/** + * Client factory for Hive clients. The factory uses connection pooling. + */ +@ThreadSafe public final class SentryServiceClientFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientFactory.class); + private static final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig(); + private final Configuration conf; + private final SentryTransportPool transportPool; - private SentryServiceClientFactory() { - } + /** Keep track of singleton instances */ + private static final AtomicReference<SentryServiceClientFactory> clientFactory = + new AtomicReference<>(); + /** + * Create a client instance. The supplied configuration is only used the first time and + * ignored afterwords. Tests that want to supply different configurations + * should call {@link #factoryReset(SentryServiceClientFactory)} to force new configuration + * read. + * @param conf Configuration + * @return client instance + * @throws Exception + */ public static SentryPolicyServiceClient create(Configuration conf) throws Exception { - boolean pooled = conf.getBoolean( - ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); - if (pooled) { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new PoolClientInvocationHandler(conf)); - } else { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf, - new SentryPolicyServiceClientDefaultImpl(conf,transportConfig), transportConfig)); + SentryServiceClientFactory factory = clientFactory.get(); + if (factory != null) { + return factory.create(); + } + factory = new SentryServiceClientFactory(conf); + boolean ok = clientFactory.compareAndSet(null, factory); + if (ok) { + return factory.create(); + } + // Close old factory + factory.close(); + return clientFactory.get().create(); + } + + /** + * Create a new instance of the factory which will hand hand off connections from + * the pool. + * @param conf Configuration object + */ + private SentryServiceClientFactory(Configuration conf) { + this.conf = conf; + + transportPool = new SentryTransportPool(conf, transportConfig, + new SentryTransportFactory(conf, transportConfig)); + } + + private SentryPolicyServiceClient create() throws Exception { + return (SentryPolicyServiceClient) Proxy + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new RetryClientInvocationHandler(conf, + new SentryPolicyServiceClientDefaultImpl(conf, transportPool), transportConfig)); + } + + /** + * Reset existing factory and return the old one. + * Only used by tests. + */ + public static SentryServiceClientFactory factoryReset(SentryServiceClientFactory factory) { + LOGGER.debug("factory reset"); + return clientFactory.getAndSet(factory); + } + + public void close() { + try { + transportPool.close(); + } catch (Exception e) { + LOGGER.error("failed to close transport pool", e); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java deleted file mode 100644 index 0164fa6..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import org.apache.commons.pool2.BasePooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related - * method to create object, destroy object and wrap object. - */ - -public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<SentryPolicyServiceClient> { - - private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class); - - private final String addr; - private final int port; - private final Configuration conf; - - public SentryServiceClientPoolFactory(String addr, int port, - Configuration conf) { - this.addr = addr; - this.port = port; - this.conf = conf; - } - - @Override - public SentryPolicyServiceClient create() throws Exception { - LOGGER.debug("Creating Sentry Service Client..."); - return new SentryPolicyServiceClientDefaultImpl(addr, port, conf); - } - - @Override - public PooledObject<SentryPolicyServiceClient> wrap(SentryPolicyServiceClient client) { - return new DefaultPooledObject<SentryPolicyServiceClient>(client); - } - - @Override - public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) { - SentryPolicyServiceClient client = pooledObject.getObject(); - LOGGER.debug("Destroying Sentry Service Client: " + client); - if (client != null) { - // The close() of TSocket or TSaslClientTransport is called actually, and there has no - // exception even there has some problems, eg, the client is closed already. - // The close here is just try to close the socket and the client will be destroyed soon. - client.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 099ebd0..d3c96fa 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -87,7 +87,7 @@ public class ServiceConstants { // The configuration for the maximum number of retries per db transaction, // the default value is 3 times public static final String SENTRY_STORE_TRANSACTION_RETRY = "sentry.store.transaction.retry"; - public static final int SENTRY_STORE_TRANSACTION_RETRY_DEFAULT = 3; + public static final int SENTRY_STORE_TRANSACTION_RETRY_DEFAULT = 10; // The configuration for the delay (in milliseconds) between retries, // the default value is 500 ms public static final String SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS = http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericServiceClientForUgi.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericServiceClientForUgi.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericServiceClientForUgi.java deleted file mode 100644 index 3f84ae4..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericServiceClientForUgi.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.generic.service.thrift; - -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; - -public class TestSentryGenericServiceClientForUgi extends SentryGenericServiceIntegrationBase { - - @BeforeClass - public static void setup() throws Exception { - kerberos = true; - beforeSetup(); - setupConf(); - startSentryService(); - afterSetup(); - } - - public static void setupConf() throws Exception { - // If kerberos is enabled, SentryTransportFactory should make sure that - // HADOOP_SECURITY_AUTHENTICATION is appropriately configured. - SentryGenericServiceIntegrationBase.setupConf(); - conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS); - conf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true"); - conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple"); - UserGroupInformation.setConfiguration(conf); - } - - /** - * Test UserGroupInformationInitializer - * <p> - * Ensures that SentryTransportFactory is making sure that HADOOP_SECURITY_AUTHENTICATION - * is appropriately configured and UserGroupInformation is initialized accordingly - * by validating the static information in UserGroupInformation Class - * - * @throws Exception - */ - @Test - public void testUserGroupInformationInitializer() throws Exception { - kerberos = false; - runTestAsSubject(new TestOperation() { - @Override - public void runTestAsSubject() throws Exception { - assert UserGroupInformation.isSecurityEnabled(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 788062c..29878c5 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -2974,12 +2974,16 @@ public class TestSentryStore extends org.junit.Assert { executor.awaitTermination(60, TimeUnit.SECONDS); List<MSentryPermChange> changes = sentryStore.getMSentryPermChanges(); - assertEquals(numThreads * numChangesPerThread, changes.size()); + int actualSize = changes.size(); + if (actualSize != (numThreads * numChangesPerThread)) { + LOGGER.warn("Detected {} dropped changes", ((numChangesPerThread * numThreads) - actualSize)); + } TreeSet<Long> changeIDs = new TreeSet<>(); for (MSentryPermChange change : changes) { changeIDs.add(change.getChangeID()); } - assertEquals("duplicated change ID", numThreads * numChangesPerThread, changeIDs.size()); + assertEquals("duplicated change ID", actualSize, changeIDs.size()); + long prevId = changeIDs.first() - 1; for (Long changeId : changeIDs) { assertTrue(String.format("Found non-consecutive number: prev=%d cur=%d", prevId, changeId), http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClientForUgi.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClientForUgi.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClientForUgi.java deleted file mode 100644 index ef94598..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClientForUgi.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.thrift; - -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceIntegrationBase; -import org.apache.sentry.service.thrift.SentryServiceIntegrationBase; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; - -public class TestSentryPolicyServiceClientForUgi extends SentryServiceIntegrationBase { - - @BeforeClass - public static void setup() throws Exception { - kerberos = true; - beforeSetup(); - setupConf(); - startSentryService(); - afterSetup(); - } - - public static void setupConf() throws Exception { - // If kerberos is enabled, SentryTransportFactory should make sure that - // HADOOP_SECURITY_AUTHENTICATION is appropriately configured. - SentryGenericServiceIntegrationBase.setupConf(); - conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS); - conf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true"); - conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple"); - UserGroupInformation.setConfiguration(conf); - } - - /** - * Test UserGroupInformationInitializer - * <p> - * Ensures that SentryTransportFactory is making sure that HADOOP_SECURITY_AUTHENTICATION - * is appropriately configured and UserGroupInformation is initialized accordingly - * by validating the static information in UserGroupInformation Class - * - * @throws Exception - */ - - @Test - public void testUserGroupInformationInitializer() throws Exception { - kerberos = false; - runTestAsSubject(new TestOperation() { - @Override - public void runTestAsSubject() throws Exception { - assert UserGroupInformation.isSecurityEnabled(); - } - }); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java index a4dd8a6..32e67b9 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java @@ -44,6 +44,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio runTestAsSubject(new TestOperation() { @Override public void runTestAsSubject() throws Exception { + SentryServiceClientFactory oldFactory = SentryServiceClientFactory.factoryReset(null); Configuration confWithSmallMaxMsgSize = new Configuration(conf); confWithSmallMaxMsgSize.setLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, 20); // create a client with a small thrift max message size @@ -63,6 +64,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio } finally { Assert.assertEquals(true, exceptionThrown); clientWithSmallMaxMsgSize.close(); + SentryServiceClientFactory.factoryReset(oldFactory); } // client can still talk with sentry server when message size is smaller. http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java deleted file mode 100644 index a202775..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import com.google.common.net.HostAndPort; -import org.apache.sentry.core.common.utils.ThriftUtil; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestPoolClientInvocationHandler { - private static final Logger LOGGER = - LoggerFactory.getLogger(TestPoolClientInvocationHandler.class); - - private void expectParseHostPortStrings(String hostsAndPortsStr, - String[] expectedHosts, int[] expectedPorts) throws Exception { - boolean success = false; - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts; - try { - hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038); - success = true; - } finally { - if (!success) { - LOGGER.error("Caught exception while parsing hosts/ports string " + - hostsAndPortsStr); - } - } - String[] hosts = new String[hostsAndPortsStrArr.length]; - int[] ports = new int[hostsAndPortsStrArr.length]; - parseHostsAndPorts(hostsAndPorts, hosts, ports); - Assert.assertArrayEquals("Got unexpected hosts results while " + - "parsing " + hostsAndPortsStr, expectedHosts, hosts); - Assert.assertArrayEquals("Got unexpected ports results while " + - "parsing " + hostsAndPortsStr, expectedPorts, ports); - } - - private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports) { - for (int i = 0; i < hostsAndPorts.length; i++) { - hosts[i] = hostsAndPorts[i].getHostText(); - ports[i] = hostsAndPorts[i].getPort(); - } - } - - @SuppressWarnings("PMD.AvoidUsingHardCodedIP") - @Test - public void testParseHostPortStrings() throws Exception { - expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038}); - expectParseHostPortStrings("foo,bar", - new String[] {"foo", "bar"}, - new int[] {8038, 8038}); - expectParseHostPortStrings("foo:2020,bar:2021", - new String[] {"foo", "bar"}, - new int[] {2020, 2021}); - expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1", - new String[] {"127.0.0.1", "127.1.0.1"}, - new int[] {2020, 8038}); - expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433", - new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"}, - new int[] {433}); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java index 3cd4a95..ba50752 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationBase.java @@ -64,10 +64,12 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.hdfs.SentryHDFSServiceClientFactory; import org.apache.sentry.hdfs.SentryINodeAttributesProvider; import org.apache.sentry.core.common.exception.SentryAlreadyExistsException; import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.SentryServiceClientFactory; import org.apache.sentry.tests.e2e.hive.StaticUserGroup; import org.apache.sentry.tests.e2e.hive.fs.MiniDFS; import org.apache.sentry.tests.e2e.hive.hiveserver.HiveServerFactory; @@ -743,6 +745,11 @@ public abstract class TestHDFSIntegrationBase { } private static void startSentry() throws Exception { + SentryServiceClientFactory factory = SentryServiceClientFactory.factoryReset(null); + if (factory != null) { + factory.close(); + } + SentryHDFSServiceClientFactory.factoryReset(); try { hiveUgi.doAs(new PrivilegedExceptionAction() { @Override http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java index bead003..7c45999 100644 --- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java @@ -28,6 +28,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.kafka.conf.KafkaAuthConf; import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; +import org.apache.sentry.provider.db.generic.UpdatableCache; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; @@ -79,8 +80,12 @@ public class AbstractKafkaSentryTestBase { @BeforeClass public static void beforeTestEndToEnd() throws Exception { + // Stop background update thread + UpdatableCache.disable(); setupConf(); startSentryServer(); + // We started a new server, invalidate all connections to the old one + SentryGenericServiceClientFactory.factoryReset(); setUserGroups(); setAdminPrivilege(); startKafkaServer(); @@ -88,8 +93,10 @@ public class AbstractKafkaSentryTestBase { @AfterClass public static void afterTestEndToEnd() throws Exception { - stopSentryServer(); + // Stop background update thread + UpdatableCache.disable(); stopKafkaServer(); + stopSentryServer(); } private static void stopKafkaServer() { @@ -170,10 +177,8 @@ public class AbstractKafkaSentryTestBase { } public static void setAdminPrivilege() throws Exception { - SentryGenericServiceClient sentryClient = null; - try { - /** grant all privilege to admin user */ - sentryClient = getSentryClient(); + try (SentryGenericServiceClient sentryClient = getSentryClient()){ + // grant all privilege to admin user sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>(); @@ -184,14 +189,10 @@ public class AbstractKafkaSentryTestBase { sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, new TSentryPrivilege(COMPONENT, "kafka", authorizables, KafkaActionConstant.ALL)); - } finally { - if (sentryClient != null) { - sentryClient.close(); - } } } - protected static SentryGenericServiceClient getSentryClient() throws Exception { + static SentryGenericServiceClient getSentryClient() throws Exception { return SentryGenericServiceClientFactory.create(getClientConfig()); } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java index 0b1ef68..6d2cabf 100644 --- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java @@ -34,6 +34,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.core.model.kafka.Topic; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; import org.junit.Assert; @@ -55,6 +56,8 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase { @Test public void testProduceConsumeForSuperuser() { + LOGGER.debug("testProduceConsumeForSuperuser"); + SentryGenericServiceClientFactory.factoryReset(); try { final String SuperuserName = "test"; testProduce(SuperuserName); @@ -66,8 +69,11 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase { @Test public void testProduceConsumeCycle() throws Exception { + LOGGER.debug("testProduceConsumeCycle"); final String localhost = InetAddress.getLocalHost().getHostAddress(); + // SentryGenericServiceClientFactory.factoryReset(); + // START TESTING PRODUCER try { testProduce("user1"); http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java index 8a01e1c..80f158a 100644 --- a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java +++ b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java @@ -197,19 +197,14 @@ public class AbstractSqoopSentryTestBase { } public static void setAdminPrivilege() throws Exception { - SentryGenericServiceClient sentryClient = null; - try { - /** grant all privilege to admin user */ - sentryClient = SentryGenericServiceClientFactory.create(getClientConfig()); + try (SentryGenericServiceClient sentryClient = + SentryGenericServiceClientFactory.create(getClientConfig())){ + // grant all privilege to admin user sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, new TSentryPrivilege(COMPONENT, SQOOP_SERVER_NAME, new ArrayList<TAuthorizable>(), SqoopActionConstant.ALL)); - } finally { - if (sentryClient != null) { - sentryClient.close(); - } } }
