http://git-wip-us.apache.org/repos/asf/sentry/blob/13c3305d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 26d7c5e..8bfa78c 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -126,6 +126,7 @@ public class SentryStore { private Configuration conf; private PrivCleaner privCleaner = null; private Thread privCleanerThread = null; + private final TransactionManager tm; public static Properties getDataNucleusProperties(Configuration conf) throws SentrySiteConfigurationException, IOException { @@ -167,9 +168,7 @@ public class SentryStore { return prop; } - public SentryStore(Configuration conf) - throws SentryNoSuchObjectException, SentryAccessDeniedException, - SentrySiteConfigurationException, IOException { + public SentryStore(Configuration conf) throws Exception { commitSequenceId = 0; this.conf = conf; Properties prop = getDataNucleusProperties(conf); @@ -183,6 +182,7 @@ public class SentryStore { prop.setProperty("datanucleus.fixedDatastore", "false"); } pmf = JDOHelper.getPersistenceManagerFactory(prop); + tm = new TransactionManager(pmf, conf); verifySentryStoreSchema(checkSchemaVersion); // Kick off the thread that cleans orphaned privileges (unless told not to) @@ -195,9 +195,12 @@ public class SentryStore { } } + public TransactionManager getTransactionManager() { + return tm; + } + // ensure that the backend DB schema is set - public void verifySentryStoreSchema(boolean checkVersion) - throws SentryNoSuchObjectException, SentryAccessDeniedException { + public void verifySentryStoreSchema(boolean checkVersion) throws Exception { if (!checkVersion) { setSentryVersion(SentryStoreSchemaInfo.getSentryVersion(), "Schema version set implicitly"); @@ -227,34 +230,6 @@ public class SentryStore { } /** - * PersistenceManager object and Transaction object have a one to one - * correspondence. Each PersistenceManager object is associated with a - * transaction object and vice versa. Hence we create a persistence manager - * instance when we create a new transaction. We create a new transaction - * for every store API since we want that unit of work to behave as a - * transaction. - * <p/> - * Note that there's only one instance of PersistenceManagerFactory object - * for the service. - * <p/> - * Synchronized because we obtain persistence manager - */ - public synchronized PersistenceManager openTransaction() { - PersistenceManager pm = pmf.getPersistenceManager(); - Transaction currentTransaction = pm.currentTransaction(); - currentTransaction.begin(); - return pm; - } - - /** - * Synchronized due to sequence id generation - */ - public synchronized CommitContext commitUpdateTransaction(PersistenceManager pm) { - commitTransaction(pm); - return new CommitContext(SERVER_UUID, incrementGetSequenceId()); - } - - /** * Increments commitSequenceId which should not be modified outside * this method. * @@ -264,16 +239,6 @@ public class SentryStore { return ++commitSequenceId; } - public void commitTransaction(PersistenceManager pm) { - Transaction currentTransaction = pm.currentTransaction(); - try { - Preconditions.checkState(currentTransaction.isActive(), "Transaction is not active"); - currentTransaction.commit(); - } finally { - pm.close(); - } - } - public void rollbackTransaction(PersistenceManager pm) { if (pm == null || pm.isClosed()) { return; @@ -311,21 +276,15 @@ public class SentryStore { * @returns commit context used for notification handlers * @throws SentryAlreadyExistsException */ - public CommitContext createSentryRole(String roleName) - throws SentryAlreadyExistsException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - createSentryRoleCore(pm, roleName); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext createSentryRole(final String roleName) + throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + createSentryRoleCore(pm, roleName); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void createSentryRoleCore(PersistenceManager pm, String roleName) @@ -340,20 +299,20 @@ public class SentryStore { } } - private <T> Long getCount(Class<T> tClass) { - PersistenceManager pm = null; - Long size = Long.valueOf(-1); + private <T> Long getCount(final Class<T> tClass) { + Long size; try { - pm = openTransaction(); - Query query = pm.newQuery(); - query.setClass(tClass); - query.setResult("count(this)"); - size = (Long)query.execute(); - - } finally { - if (pm != null) { - commitTransaction(pm); - } + size = (Long) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(); + query.setClass(tClass); + query.setResult("count(this)"); + return (Long) query.execute(); + } + }); + } catch (Exception e) { + size = Long.valueOf(-1); } return size; } @@ -405,56 +364,49 @@ public class SentryStore { @VisibleForTesting void clearAllTables() { - boolean rollbackTransaction = true; - PersistenceManager pm = null; try { - pm = openTransaction(); - pm.newQuery(MSentryRole.class).deletePersistentAll(); - pm.newQuery(MSentryGroup.class).deletePersistentAll(); - pm.newQuery(MSentryUser.class).deletePersistentAll(); - pm.newQuery(MSentryPrivilege.class).deletePersistentAll(); - commitUpdateTransaction(pm); - rollbackTransaction = false; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + pm.newQuery(MSentryRole.class).deletePersistentAll(); + pm.newQuery(MSentryGroup.class).deletePersistentAll(); + pm.newQuery(MSentryUser.class).deletePersistentAll(); + pm.newQuery(MSentryPrivilege.class).deletePersistentAll(); + return null; + } + }); + } catch (Exception e) { + // the method only for test, log the error and ignore the exception + LOGGER.error(e.getMessage(), e); } } public CommitContext alterSentryRoleGrantPrivilege(String grantorPrincipal, String roleName, TSentryPrivilege privilege) - throws SentryUserException { + throws Exception { return alterSentryRoleGrantPrivileges(grantorPrincipal, roleName, Sets.newHashSet(privilege)); } - public CommitContext alterSentryRoleGrantPrivileges(String grantorPrincipal, - String roleName, Set<TSentryPrivilege> privileges) - throws SentryUserException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - String trimmedRoleName = trimAndLower(roleName); - try { - pm = openTransaction(); - for (TSentryPrivilege privilege : privileges) { - // first do grant check - grantOptionCheck(pm, grantorPrincipal, privilege); - - MSentryPrivilege mPrivilege = alterSentryRoleGrantPrivilegeCore(pm, trimmedRoleName, privilege); - - if (mPrivilege != null) { - convertToTSentryPrivilege(mPrivilege, privilege); - } - } - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext alterSentryRoleGrantPrivileges(final String grantorPrincipal, + final String roleName, final Set<TSentryPrivilege> privileges) + throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + String trimmedRoleName = trimAndLower(roleName); + for (TSentryPrivilege privilege : privileges) { + // first do grant check + grantOptionCheck(pm, grantorPrincipal, privilege); + MSentryPrivilege mPrivilege = alterSentryRoleGrantPrivilegeCore( + pm, trimmedRoleName, privilege); + if (mPrivilege != null) { + convertToTSentryPrivilege(mPrivilege, privilege); + } + } + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private MSentryPrivilege alterSentryRoleGrantPrivilegeCore(PersistenceManager pm, @@ -516,33 +468,26 @@ public class SentryStore { } public CommitContext alterSentryRoleRevokePrivilege(String grantorPrincipal, - String roleName, TSentryPrivilege tPrivilege) throws SentryUserException { + String roleName, TSentryPrivilege tPrivilege) throws Exception { return alterSentryRoleRevokePrivileges(grantorPrincipal, roleName, Sets.newHashSet(tPrivilege)); } - public CommitContext alterSentryRoleRevokePrivileges(String grantorPrincipal, - String roleName, Set<TSentryPrivilege> tPrivileges) throws SentryUserException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - String trimmedRoleName = safeTrimLower(roleName); - try { - pm = openTransaction(); - for (TSentryPrivilege tPrivilege : tPrivileges) { - // first do revoke check - grantOptionCheck(pm, grantorPrincipal, tPrivilege); - - alterSentryRoleRevokePrivilegeCore(pm, trimmedRoleName, tPrivilege); - } + public CommitContext alterSentryRoleRevokePrivileges(final String grantorPrincipal, + final String roleName, final Set<TSentryPrivilege> tPrivileges) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + String trimmedRoleName = safeTrimLower(roleName); + for (TSentryPrivilege tPrivilege : tPrivileges) { + // first do revoke check + grantOptionCheck(pm, grantorPrincipal, tPrivilege); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + alterSentryRoleRevokePrivilegeCore(pm, trimmedRoleName, tPrivilege); + } + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void alterSentryRoleRevokePrivilegeCore(PersistenceManager pm, @@ -791,21 +736,14 @@ public class SentryStore { return null; } - public CommitContext dropSentryRole(String roleName) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - dropSentryRoleCore(pm, roleName); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext dropSentryRole(final String roleName) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + dropSentryRoleCore(pm, roleName); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void dropSentryRoleCore(PersistenceManager pm, String roleName) @@ -829,22 +767,15 @@ public class SentryStore { } } - public CommitContext alterSentryRoleAddGroups(String grantorPrincipal, String roleName, - Set<TSentryGroup> groupNames) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - alterSentryRoleAddGroupsCore(pm, roleName, groupNames); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext alterSentryRoleAddGroups(final String grantorPrincipal, + final String roleName, final Set<TSentryGroup> groupNames) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + alterSentryRoleAddGroupsCore(pm, roleName, groupNames); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void alterSentryRoleAddGroupsCore(PersistenceManager pm, String roleName, @@ -876,21 +807,15 @@ public class SentryStore { } } - public CommitContext alterSentryRoleAddUsers(String roleName, - Set<String> userNames) throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - alterSentryRoleAddUsersCore(pm, roleName, userNames); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext alterSentryRoleAddUsers(final String roleName, + final Set<String> userNames) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + alterSentryRoleAddUsersCore(pm, roleName, userNames); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void alterSentryRoleAddUsersCore(PersistenceManager pm, String roleName, @@ -918,243 +843,226 @@ public class SentryStore { } } - public CommitContext alterSentryRoleDeleteUsers(String roleName, Set<String> userNames) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - String trimmedRoleName = trimAndLower(roleName); - try { - pm = openTransaction(); - MSentryRole role = getMSentryRole(pm, trimmedRoleName); - if (role == null) { - throw new SentryNoSuchObjectException("Role: " + trimmedRoleName); - } else { - Query query = pm.newQuery(MSentryUser.class); - query.setFilter("this.userName == t"); - query.declareParameters("java.lang.String t"); - query.setUnique(true); - List<MSentryUser> users = Lists.newArrayList(); - for (String userName : userNames) { - userName = userName.trim(); - MSentryUser user = (MSentryUser) query.execute(userName); - if (user != null) { - user.removeRole(role); - users.add(user); + public CommitContext alterSentryRoleDeleteUsers(final String roleName, + final Set<String> userNames) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + String trimmedRoleName = trimAndLower(roleName); + MSentryRole role = getMSentryRole(pm, trimmedRoleName); + if (role == null) { + throw new SentryNoSuchObjectException("Role: " + trimmedRoleName); + } else { + Query query = pm.newQuery(MSentryUser.class); + query.setFilter("this.userName == t"); + query.declareParameters("java.lang.String t"); + query.setUnique(true); + List<MSentryUser> users = Lists.newArrayList(); + for (String userName : userNames) { + userName = userName.trim(); + MSentryUser user = (MSentryUser) query.execute(userName); + if (user != null) { + user.removeRole(role); + users.add(user); + } + } + pm.makePersistentAll(users); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } } - } - pm.makePersistentAll(users); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } - } - - public CommitContext alterSentryRoleDeleteGroups(String roleName, - Set<TSentryGroup> groupNames) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - String trimmedRoleName = trimAndLower(roleName); - try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryRole.class); - query.setFilter("this.roleName == t"); - query.declareParameters("java.lang.String t"); - query.setUnique(true); - MSentryRole role = (MSentryRole) query.execute(trimmedRoleName); - if (role == null) { - throw new SentryNoSuchObjectException("Role: " + trimmedRoleName + " doesn't exist"); - } else { - query = pm.newQuery(MSentryGroup.class); - query.setFilter("this.groupName == t"); - query.declareParameters("java.lang.String t"); - query.setUnique(true); - List<MSentryGroup> groups = Lists.newArrayList(); - for (TSentryGroup tGroup : groupNames) { - String groupName = tGroup.getGroupName().trim(); - MSentryGroup group = (MSentryGroup) query.execute(groupName); - if (group != null) { - group.removeRole(role); - groups.add(group); + }); + } + + public CommitContext alterSentryRoleDeleteGroups(final String roleName, + final Set<TSentryGroup> groupNames) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + String trimmedRoleName = trimAndLower(roleName); + Query query = pm.newQuery(MSentryRole.class); + query.setFilter("this.roleName == t"); + query.declareParameters("java.lang.String t"); + query.setUnique(true); + MSentryRole role = (MSentryRole) query.execute(trimmedRoleName); + if (role == null) { + throw new SentryNoSuchObjectException("Role: " + trimmedRoleName + " doesn't exist"); + } else { + query = pm.newQuery(MSentryGroup.class); + query.setFilter("this.groupName == t"); + query.declareParameters("java.lang.String t"); + query.setUnique(true); + List<MSentryGroup> groups = Lists.newArrayList(); + for (TSentryGroup tGroup : groupNames) { + String groupName = tGroup.getGroupName().trim(); + MSentryGroup group = (MSentryGroup) query.execute(groupName); + if (group != null) { + group.removeRole(role); + groups.add(group); + } + } + pm.makePersistentAll(groups); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } } - } - pm.makePersistentAll(groups); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + }); } @VisibleForTesting - MSentryRole getMSentryRoleByName(String roleName) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - String trimmedRoleName = trimAndLower(roleName); - try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryRole.class); - query.setFilter("this.roleName == t"); - query.declareParameters("java.lang.String t"); - query.setUnique(true); - MSentryRole sentryRole = (MSentryRole) query.execute(trimmedRoleName); - if (sentryRole == null) { - throw new SentryNoSuchObjectException("Role: " + trimmedRoleName + " doesn't exist"); - } else { - pm.retrieve(sentryRole); - } - rollbackTransaction = false; - commitTransaction(pm); - return sentryRole; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + MSentryRole getMSentryRoleByName(final String roleName) + throws Exception { + return (MSentryRole)tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + String trimmedRoleName = trimAndLower(roleName); + Query query = pm.newQuery(MSentryRole.class); + query.setFilter("this.roleName == t"); + query.declareParameters("java.lang.String t"); + query.setUnique(true); + MSentryRole sentryRole = (MSentryRole) query.execute(trimmedRoleName); + if (sentryRole == null) { + throw new SentryNoSuchObjectException("Role: " + trimmedRoleName + " doesn't exist"); + } else { + pm.retrieve(sentryRole); + } + return sentryRole; + } + }); } - private boolean hasAnyServerPrivileges(Set<String> roleNames, String serverName) { + private boolean hasAnyServerPrivileges(final Set<String> roleNames, final String serverName) { if (roleNames == null || roleNames.isEmpty()) { return false; } - boolean rollbackTransaction = true; - PersistenceManager pm = null; + + boolean result = false; try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryPrivilege.class); - query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); - List<String> rolesFiler = new LinkedList<String>(); - for (String rName : roleNames) { - rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); - } - StringBuilder filters = new StringBuilder("roles.contains(role) " - + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); - filters.append("&& serverName == \"" + trimAndLower(serverName) + "\""); - query.setFilter(filters.toString()); - query.setResult("count(this)"); - - Long numPrivs = (Long) query.execute(); - rollbackTransaction = false; - commitTransaction(pm); - return numPrivs > 0; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Boolean) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPrivilege.class); + query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); + List<String> rolesFiler = new LinkedList<String>(); + for (String rName : roleNames) { + rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); + } + StringBuilder filters = new StringBuilder("roles.contains(role) " + + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); + filters.append("&& serverName == \"" + trimAndLower(serverName) + "\""); + query.setFilter(filters.toString()); + query.setResult("count(this)"); + + Long numPrivs = (Long) query.execute(); + return numPrivs > 0; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } - List<MSentryPrivilege> getMSentryPrivileges(Set<String> roleNames, TSentryAuthorizable authHierarchy) { + List<MSentryPrivilege> getMSentryPrivileges(final Set<String> roleNames, + final TSentryAuthorizable authHierarchy) { + List<MSentryPrivilege> result = new ArrayList<MSentryPrivilege>(); if (roleNames == null || roleNames.isEmpty()) { - return new ArrayList<MSentryPrivilege>(); + return result; } - boolean rollbackTransaction = true; - PersistenceManager pm = null; + try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryPrivilege.class); - query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); - List<String> rolesFiler = new LinkedList<String>(); - for (String rName : roleNames) { - rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); - } - StringBuilder filters = new StringBuilder("roles.contains(role) " - + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); - if (authHierarchy != null && authHierarchy.getServer() != null) { - filters.append("&& serverName == \"" + authHierarchy.getServer().toLowerCase() + "\""); - if (authHierarchy.getDb() != null) { - filters.append(" && ((dbName == \"" + authHierarchy.getDb().toLowerCase() + "\") || (dbName == \"__NULL__\")) && (URI == \"__NULL__\")"); - if (authHierarchy.getTable() != null - && !AccessConstants.ALL.equalsIgnoreCase(authHierarchy.getTable())) { - if (!AccessConstants.SOME.equalsIgnoreCase(authHierarchy.getTable())) { - filters.append(" && ((tableName == \"" + authHierarchy.getTable().toLowerCase() + "\") || (tableName == \"__NULL__\")) && (URI == \"__NULL__\")"); - } - if (authHierarchy.getColumn() != null - && !AccessConstants.ALL.equalsIgnoreCase(authHierarchy.getColumn()) - && !AccessConstants.SOME.equalsIgnoreCase(authHierarchy.getColumn())) { - filters.append(" && ((columnName == \"" + authHierarchy.getColumn().toLowerCase() + "\") || (columnName == \"__NULL__\")) && (URI == \"__NULL__\")"); + result = (List<MSentryPrivilege>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPrivilege.class); + query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); + List<String> rolesFiler = new LinkedList<String>(); + for (String rName : roleNames) { + rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); + } + StringBuilder filters = new StringBuilder("roles.contains(role) " + + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); + if (authHierarchy != null && authHierarchy.getServer() != null) { + filters.append("&& serverName == \"" + authHierarchy.getServer().toLowerCase() + "\""); + if (authHierarchy.getDb() != null) { + filters.append(" && ((dbName == \"" + authHierarchy.getDb().toLowerCase() + "\") || (dbName == \"__NULL__\")) && (URI == \"__NULL__\")"); + if (authHierarchy.getTable() != null + && !AccessConstants.ALL.equalsIgnoreCase(authHierarchy.getTable())) { + if (!AccessConstants.SOME.equalsIgnoreCase(authHierarchy.getTable())) { + filters.append(" && ((tableName == \"" + authHierarchy.getTable().toLowerCase() + "\") || (tableName == \"__NULL__\")) && (URI == \"__NULL__\")"); + } + if (authHierarchy.getColumn() != null + && !AccessConstants.ALL.equalsIgnoreCase(authHierarchy.getColumn()) + && !AccessConstants.SOME.equalsIgnoreCase(authHierarchy.getColumn())) { + filters.append(" && ((columnName == \"" + authHierarchy.getColumn().toLowerCase() + "\") || (columnName == \"__NULL__\")) && (URI == \"__NULL__\")"); + } + } + } + if (authHierarchy.getUri() != null) { + filters.append(" && ((URI != \"__NULL__\") && (\"" + authHierarchy.getUri() + "\".startsWith(URI)) || (URI == \"__NULL__\")) && (dbName == \"__NULL__\")"); + } + } + query.setFilter(filters.toString()); + return (List<MSentryPrivilege>) query.execute(); } - } - } - if (authHierarchy.getUri() != null) { - filters.append(" && ((URI != \"__NULL__\") && (\"" + authHierarchy.getUri() + "\".startsWith(URI)) || (URI == \"__NULL__\")) && (dbName == \"__NULL__\")"); - } - } - query.setFilter(filters.toString()); - List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query.execute(); - rollbackTransaction = false; - commitTransaction(pm); - return privileges; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } - List<MSentryPrivilege> getMSentryPrivilegesByAuth(Set<String> roleNames, TSentryAuthorizable authHierarchy) { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + List<MSentryPrivilege> getMSentryPrivilegesByAuth(final Set<String> roleNames, + final TSentryAuthorizable authHierarchy) { + List<MSentryPrivilege> result = new ArrayList<MSentryPrivilege>(); try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryPrivilege.class); - StringBuilder filters = new StringBuilder(); - if (roleNames == null || roleNames.isEmpty()) { - filters.append(" !roles.isEmpty() "); - } else { - query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); - List<String> rolesFiler = new LinkedList<String>(); - for (String rName : roleNames) { - rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); - } - filters.append("roles.contains(role) " - + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); - } - if (authHierarchy.getServer() != null) { - filters.append("&& serverName == \"" + - authHierarchy.getServer().toLowerCase() + "\""); - if (authHierarchy.getDb() != null) { - filters.append(" && (dbName == \"" + - authHierarchy.getDb().toLowerCase() + "\") && (URI == \"__NULL__\")"); - if (authHierarchy.getTable() != null) { - filters.append(" && (tableName == \"" + - authHierarchy.getTable().toLowerCase() + "\")"); - } else { - filters.append(" && (tableName == \"__NULL__\")"); - } - } else if (authHierarchy.getUri() != null) { - filters.append(" && (URI != \"__NULL__\") && (\"" + authHierarchy.getUri() + - "\".startsWith(URI)) && (dbName == \"__NULL__\")"); - } else { - filters.append(" && (dbName == \"__NULL__\") && (URI == \"__NULL__\")"); - } - } else { - // if no server, then return empty resultset - return new ArrayList<MSentryPrivilege>(); - } - FetchGroup grp = pm.getFetchGroup(MSentryPrivilege.class, "fetchRole"); - grp.addMember("roles"); - pm.getFetchPlan().addGroup("fetchRole"); - query.setFilter(filters.toString()); - List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query.execute(); - rollbackTransaction = false; - commitTransaction(pm); - return privileges; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (List<MSentryPrivilege>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPrivilege.class); + StringBuilder filters = new StringBuilder(); + if (roleNames == null || roleNames.isEmpty()) { + filters.append(" !roles.isEmpty() "); + } else { + query.declareVariables("org.apache.sentry.provider.db.service.model.MSentryRole role"); + List<String> rolesFiler = new LinkedList<String>(); + for (String rName : roleNames) { + rolesFiler.add("role.roleName == \"" + trimAndLower(rName) + "\""); + } + filters.append("roles.contains(role) " + + "&& (" + Joiner.on(" || ").join(rolesFiler) + ") "); + } + if (authHierarchy.getServer() != null) { + filters.append("&& serverName == \"" + + authHierarchy.getServer().toLowerCase() + "\""); + if (authHierarchy.getDb() != null) { + filters.append(" && (dbName == \"" + + authHierarchy.getDb().toLowerCase() + "\") && (URI == \"__NULL__\")"); + if (authHierarchy.getTable() != null) { + filters.append(" && (tableName == \"" + + authHierarchy.getTable().toLowerCase() + "\")"); + } else { + filters.append(" && (tableName == \"__NULL__\")"); + } + } else if (authHierarchy.getUri() != null) { + filters.append(" && (URI != \"__NULL__\") && (\"" + authHierarchy.getUri() + + "\".startsWith(URI)) && (dbName == \"__NULL__\")"); + } else { + filters.append(" && (dbName == \"__NULL__\") && (URI == \"__NULL__\")"); + } + } else { + // if no server, then return empty resultset + return new ArrayList<MSentryPrivilege>(); + } + FetchGroup grp = pm.getFetchGroup(MSentryPrivilege.class, "fetchRole"); + grp.addMember("roles"); + pm.getFetchPlan().addGroup("fetchRole"); + query.setFilter(filters.toString()); + return (List<MSentryPrivilege>) query.execute(); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } public TSentryPrivilegeMap listSentryPrivilegesByAuthorizable(Set<String> groups, @@ -1193,7 +1101,7 @@ public class SentryStore { } private Set<MSentryPrivilege> getMSentryPrivilegesByRoleName(String roleName) - throws SentryNoSuchObjectException { + throws Exception { MSentryRole mSentryRole = getMSentryRoleByName(roleName); return mSentryRole.getPrivileges(); } @@ -1202,11 +1110,11 @@ public class SentryStore { * Gets sentry privilege objects for a given roleName from the persistence layer * @param roleName : roleName to look up * @return : Set of thrift sentry privilege objects - * @throws SentryNoSuchObjectException + * @throws Exception */ public Set<TSentryPrivilege> getAllTSentryPrivilegesByRoleName(String roleName) - throws SentryNoSuchObjectException { + throws Exception { return convertToTSentryPrivileges(getMSentryPrivilegesByRoleName(roleName)); } @@ -1236,44 +1144,36 @@ public class SentryStore { } - private Set<MSentryRole> getMSentryRolesByGroupName(String groupName) - throws SentryNoSuchObjectException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - Set<MSentryRole> roles; - pm = openTransaction(); - - //If no group name was specified, return all roles - if (groupName == null) { - Query query = pm.newQuery(MSentryRole.class); - roles = new HashSet<MSentryRole>((List<MSentryRole>)query.execute()); - } else { - Query query = pm.newQuery(MSentryGroup.class); - MSentryGroup sentryGroup; - String trimmedGroupName = groupName.trim(); - query.setFilter("this.groupName == t"); - query.declareParameters("java.lang.String t"); - query.setUnique(true); - sentryGroup = (MSentryGroup) query.execute(trimmedGroupName); - if (sentryGroup == null) { - throw new SentryNoSuchObjectException("Group: " + trimmedGroupName + " doesn't exist"); - } else { - pm.retrieve(sentryGroup); - } - roles = sentryGroup.getRoles(); - } - for ( MSentryRole role: roles) { - pm.retrieve(role); - } - commitTransaction(pm); - rollbackTransaction = false; - return roles; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + private Set<MSentryRole> getMSentryRolesByGroupName(final String groupName) throws Exception { + return (Set<MSentryRole>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Set<MSentryRole> roles; + //If no group name was specified, return all roles + if (groupName == null) { + Query query = pm.newQuery(MSentryRole.class); + roles = new HashSet<MSentryRole>((List<MSentryRole>)query.execute()); + } else { + Query query = pm.newQuery(MSentryGroup.class); + MSentryGroup sentryGroup; + String trimmedGroupName = groupName.trim(); + query.setFilter("this.groupName == t"); + query.declareParameters("java.lang.String t"); + query.setUnique(true); + sentryGroup = (MSentryGroup) query.execute(trimmedGroupName); + if (sentryGroup == null) { + throw new SentryNoSuchObjectException("Group: " + trimmedGroupName + " doesn't exist"); + } else { + pm.retrieve(sentryGroup); + } + roles = sentryGroup.getRoles(); + } + for (MSentryRole role: roles) { + pm.retrieve(role); + } + return roles; + } + }); } /** @@ -1283,7 +1183,7 @@ public class SentryStore { * @throws SentryNoSuchObjectException */ public Set<TSentryRole> getTSentryRolesByGroupName(Set<String> groupNames, - boolean checkAllGroups) throws SentryNoSuchObjectException { + boolean checkAllGroups) throws Exception { Set<MSentryRole> roleSet = Sets.newHashSet(); for (String groupName : groupNames) { try { @@ -1298,69 +1198,68 @@ public class SentryStore { return convertToTSentryRoles(roleSet); } - public Set<String> getRoleNamesForGroups(Set<String> groups) { + public Set<String> getRoleNamesForGroups(final Set<String> groups) { if (groups == null || groups.isEmpty()) { return ImmutableSet.of(); } - boolean rollbackTransaction = true; - PersistenceManager pm = null; + + Set<String> result = new HashSet<>(); try { - pm = openTransaction(); - Set<String> result = getRoleNamesForGroupsCore(pm, groups); - rollbackTransaction = false; - commitTransaction(pm); - return result; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Set<String>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getRoleNamesForGroupsCore(pm, groups); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } private Set<String> getRoleNamesForGroupsCore(PersistenceManager pm, Set<String> groups) { return convertToRoleNameSet(getRolesForGroups(pm, groups)); } - public Set<String> getRoleNamesForUsers(Set<String> users) { + public Set<String> getRoleNamesForUsers(final Set<String> users) { if (users == null || users.isEmpty()) { return ImmutableSet.of(); } - boolean rollbackTransaction = true; - PersistenceManager pm = null; + + Set<String> result = new HashSet<>(); try { - pm = openTransaction(); - Set<String> result = getRoleNamesForUsersCore(pm,users); - rollbackTransaction = false; - commitTransaction(pm); - return result; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Set<String>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getRoleNamesForUsersCore(pm,users); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } private Set<String> getRoleNamesForUsersCore(PersistenceManager pm, Set<String> users) { return convertToRoleNameSet(getRolesForUsers(pm, users)); } - public Set<TSentryRole> getTSentryRolesByUserNames(Set<String> users) { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + public Set<TSentryRole> getTSentryRolesByUserNames(final Set<String> users) { + Set<TSentryRole> result = new HashSet<>(); try { - pm = openTransaction(); - Set<MSentryRole> mSentryRoles = getRolesForUsers(pm, users); - // Since {@link MSentryRole#getGroups()} is lazy-loading, the converting should be call - // before transaction committed. - Set<TSentryRole> result = convertToTSentryRoles(mSentryRoles); - rollbackTransaction = false; - commitTransaction(pm); - return result; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Set<TSentryRole>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Set<MSentryRole> mSentryRoles = getRolesForUsers(pm, users); + // Since {@link MSentryRole#getGroups()} is lazy-loading, the converting should be call + // before transaction committed. + return convertToTSentryRoles(mSentryRoles); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } public Set<MSentryRole> getRolesForGroups(PersistenceManager pm, Set<String> groups) { @@ -1421,26 +1320,25 @@ public class SentryStore { return hasAnyServerPrivileges(rolesToQuery, server); } - private Set<String> getRolesToQuery(Set<String> groups, Set<String> users, - TSentryActiveRoleSet roleSet) { - Set<String> activeRoleNames = toTrimedLower(roleSet.getRoles()); - - Set<String> roleNames = Sets.newHashSet(); - boolean rollbackTransaction = true; - PersistenceManager pm = null; + private Set<String> getRolesToQuery(final Set<String> groups, final Set<String> users, + final TSentryActiveRoleSet roleSet) { + Set<String> result = new HashSet<>(); try { - pm = openTransaction(); - roleNames.addAll(toTrimedLower(getRoleNamesForGroupsCore(pm, groups))); - roleNames.addAll(toTrimedLower(getRoleNamesForUsersCore(pm, users))); - rollbackTransaction = false; - commitTransaction(pm); - return roleSet.isAll() ? roleNames : Sets.intersection(activeRoleNames, - roleNames); - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Set<String>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Set<String> activeRoleNames = toTrimedLower(roleSet.getRoles()); + Set<String> roleNames = Sets.newHashSet(); + roleNames.addAll(toTrimedLower(getRoleNamesForGroupsCore(pm, groups))); + roleNames.addAll(toTrimedLower(getRoleNamesForUsersCore(pm, users))); + return roleSet.isAll() ? roleNames : Sets.intersection(activeRoleNames, + roleNames); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } @VisibleForTesting @@ -1597,107 +1495,90 @@ public class SentryStore { return s.trim().toLowerCase(); } - public String getSentryVersion() throws SentryNoSuchObjectException, - SentryAccessDeniedException { + public String getSentryVersion() throws Exception { MSentryVersion mVersion = getMSentryVersion(); return mVersion.getSchemaVersion(); } - public void setSentryVersion(String newVersion, String verComment) - throws SentryNoSuchObjectException, SentryAccessDeniedException { - MSentryVersion mVersion; - boolean rollbackTransaction = true; - PersistenceManager pm = null; - - try { - mVersion = getMSentryVersion(); - if (newVersion.equals(mVersion.getSchemaVersion())) { - // specified version already in there - return; - } - } catch (SentryNoSuchObjectException e) { - // if the version doesn't exist, then create it - mVersion = new MSentryVersion(); - } - mVersion.setSchemaVersion(newVersion); - mVersion.setVersionComment(verComment); - try { - pm = openTransaction(); - pm.makePersistent(mVersion); - rollbackTransaction = false; - commitTransaction(pm); - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public void setSentryVersion(final String newVersion, final String verComment) + throws Exception { + tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + MSentryVersion mVersion; + try { + mVersion = getMSentryVersion(); + if (newVersion.equals(mVersion.getSchemaVersion())) { + // specified version already in there + return null; + } + } catch (SentryNoSuchObjectException e) { + // if the version doesn't exist, then create it + mVersion = new MSentryVersion(); + } + mVersion.setSchemaVersion(newVersion); + mVersion.setVersionComment(verComment); + pm.makePersistent(mVersion); + return null; + } + }); } @SuppressWarnings("unchecked") private MSentryVersion getMSentryVersion() - throws SentryNoSuchObjectException, SentryAccessDeniedException { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryVersion.class); - List<MSentryVersion> mSentryVersions = (List<MSentryVersion>) query - .execute(); - pm.retrieveAll(mSentryVersions); - rollbackTransaction = false; - commitTransaction(pm); - if (mSentryVersions.isEmpty()) { - throw new SentryNoSuchObjectException("No matching version found"); - } - if (mSentryVersions.size() > 1) { - throw new SentryAccessDeniedException( - "Metastore contains multiple versions"); - } - return mSentryVersions.get(0); - } catch (JDODataStoreException e) { - if (e.getCause() instanceof MissingTableException) { - throw new SentryAccessDeniedException("Version table not found. " - + "The sentry store is not set or corrupt "); - } else { - throw e; - } - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + throws Exception { + return (MSentryVersion) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + try { + Query query = pm.newQuery(MSentryVersion.class); + List<MSentryVersion> mSentryVersions = (List<MSentryVersion>) query + .execute(); + pm.retrieveAll(mSentryVersions); + if (mSentryVersions.isEmpty()) { + throw new SentryNoSuchObjectException("No matching version found"); + } + if (mSentryVersions.size() > 1) { + throw new SentryAccessDeniedException( + "Metastore contains multiple versions"); + } + return mSentryVersions.get(0); + } catch (JDODataStoreException e) { + if (e.getCause() instanceof MissingTableException) { + throw new SentryAccessDeniedException("Version table not found. " + + "The sentry store is not set or corrupt "); + } else { + throw e; + } + } + } + }); } /** * Drop given privilege from all roles */ - public void dropPrivilege(TSentryAuthorizable tAuthorizable) - throws SentryNoSuchObjectException, SentryInvalidInputException { - PersistenceManager pm = null; - boolean rollbackTransaction = true; - - TSentryPrivilege tPrivilege = toSentryPrivilege(tAuthorizable); - try { - pm = openTransaction(); - - if (isMultiActionsSupported(tPrivilege)) { - for (String privilegeAction : ALL_ACTIONS) { - tPrivilege.setAction(privilegeAction); - dropPrivilegeForAllRoles(pm, new TSentryPrivilege(tPrivilege)); - } - } else { - dropPrivilegeForAllRoles(pm, new TSentryPrivilege(tPrivilege)); - } - rollbackTransaction = false; - commitTransaction(pm); - } catch (JDODataStoreException e) { - throw new SentryInvalidInputException("Failed to get privileges: " - + e.getMessage()); - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public void dropPrivilege(final TSentryAuthorizable tAuthorizable) throws Exception { + tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + TSentryPrivilege tPrivilege = toSentryPrivilege(tAuthorizable); + try { + if (isMultiActionsSupported(tPrivilege)) { + for (String privilegeAction : ALL_ACTIONS) { + tPrivilege.setAction(privilegeAction); + dropPrivilegeForAllRoles(pm, new TSentryPrivilege(tPrivilege)); + } + } else { + dropPrivilegeForAllRoles(pm, new TSentryPrivilege(tPrivilege)); + } + } catch (JDODataStoreException e) { + throw new SentryInvalidInputException("Failed to get privileges: " + + e.getMessage()); + } + return null; + } + }); } /** @@ -1707,37 +1588,31 @@ public class SentryStore { * @throws SentryNoSuchObjectException * @throws SentryInvalidInputException */ - public void renamePrivilege(TSentryAuthorizable tAuthorizable, - TSentryAuthorizable newTAuthorizable) - throws SentryNoSuchObjectException, SentryInvalidInputException { - PersistenceManager pm = null; - boolean rollbackTransaction = true; - - TSentryPrivilege tPrivilege = toSentryPrivilege(tAuthorizable); - TSentryPrivilege newPrivilege = toSentryPrivilege(newTAuthorizable); - - try { - pm = openTransaction(); - // In case of tables or DBs, check all actions - if (isMultiActionsSupported(tPrivilege)) { - for (String privilegeAction : ALL_ACTIONS) { - tPrivilege.setAction(privilegeAction); - newPrivilege.setAction(privilegeAction); - renamePrivilegeForAllRoles(pm, tPrivilege, newPrivilege); - } - } else { - renamePrivilegeForAllRoles(pm, tPrivilege, newPrivilege); - } - rollbackTransaction = false; - commitTransaction(pm); - } catch (JDODataStoreException e) { - throw new SentryInvalidInputException("Failed to get privileges: " - + e.getMessage()); - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public void renamePrivilege(final TSentryAuthorizable tAuthorizable, + final TSentryAuthorizable newTAuthorizable) throws Exception { + tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + TSentryPrivilege tPrivilege = toSentryPrivilege(tAuthorizable); + TSentryPrivilege newPrivilege = toSentryPrivilege(newTAuthorizable); + try { + // In case of tables or DBs, check all actions + if (isMultiActionsSupported(tPrivilege)) { + for (String privilegeAction : ALL_ACTIONS) { + tPrivilege.setAction(privilegeAction); + newPrivilege.setAction(privilegeAction); + renamePrivilegeForAllRoles(pm, tPrivilege, newPrivilege); + } + } else { + renamePrivilegeForAllRoles(pm, tPrivilege, newPrivilege); + } + } catch (JDODataStoreException e) { + throw new SentryInvalidInputException("Failed to get privileges: " + + e.getMessage()); + } + return null; + } + }); } // Currently INSERT/SELECT/ALL are supported for Table and DB level privileges @@ -1915,123 +1790,113 @@ public class SentryStore { * This returns a Mapping of AuthZObj(db/table) -> (Role -> permission) */ public Map<String, HashMap<String, String>> retrieveFullPrivilegeImage() { - Map<String, HashMap<String, String>> retVal = new HashMap<String, HashMap<String,String>>(); - boolean rollbackTransaction = true; - PersistenceManager pm = null; + Map<String, HashMap<String, String>> result = new HashMap<>(); try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryPrivilege.class); - String filters = "(serverName != \"__NULL__\") " - + "&& (dbName != \"__NULL__\") " + "&& (URI == \"__NULL__\")"; - query.setFilter(filters.toString()); - query - .setOrdering("serverName ascending, dbName ascending, tableName ascending"); - List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query - .execute(); - rollbackTransaction = false; - for (MSentryPrivilege mPriv : privileges) { - String authzObj = mPriv.getDbName(); - if (!isNULL(mPriv.getTableName())) { - authzObj = authzObj + "." + mPriv.getTableName(); - } - HashMap<String, String> pUpdate = retVal.get(authzObj); - if (pUpdate == null) { - pUpdate = new HashMap<String, String>(); - retVal.put(authzObj, pUpdate); - } - for (MSentryRole mRole : mPriv.getRoles()) { - String existingPriv = pUpdate.get(mRole.getRoleName()); - if (existingPriv == null) { - pUpdate.put(mRole.getRoleName(), mPriv.getAction().toUpperCase()); - } else { - pUpdate.put(mRole.getRoleName(), existingPriv + "," - + mPriv.getAction().toUpperCase()); - } - } - } - commitTransaction(pm); - return retVal; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, HashMap<String, String>>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Map<String, HashMap<String, String>> retVal = new HashMap<>(); + Query query = pm.newQuery(MSentryPrivilege.class); + String filters = "(serverName != \"__NULL__\") " + + "&& (dbName != \"__NULL__\") " + "&& (URI == \"__NULL__\")"; + query.setFilter(filters.toString()); + query + .setOrdering("serverName ascending, dbName ascending, tableName ascending"); + List<MSentryPrivilege> privileges = (List<MSentryPrivilege>) query + .execute(); + for (MSentryPrivilege mPriv : privileges) { + String authzObj = mPriv.getDbName(); + if (!isNULL(mPriv.getTableName())) { + authzObj = authzObj + "." + mPriv.getTableName(); + } + HashMap<String, String> pUpdate = retVal.get(authzObj); + if (pUpdate == null) { + pUpdate = new HashMap<String, String>(); + retVal.put(authzObj, pUpdate); + } + for (MSentryRole mRole : mPriv.getRoles()) { + String existingPriv = pUpdate.get(mRole.getRoleName()); + if (existingPriv == null) { + pUpdate.put(mRole.getRoleName(), mPriv.getAction().toUpperCase()); + } else { + pUpdate.put(mRole.getRoleName(), existingPriv + "," + + mPriv.getAction().toUpperCase()); + } + } + } + return retVal; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } /** * This returns a Mapping of Role -> [Groups] */ public Map<String, LinkedList<String>> retrieveFullRoleImage() { - Map<String, LinkedList<String>> retVal = new HashMap<String, LinkedList<String>>(); - boolean rollbackTransaction = true; - PersistenceManager pm = null; + Map<String, LinkedList<String>> result = new HashMap<>(); try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryGroup.class); - List<MSentryGroup> groups = (List<MSentryGroup>) query.execute(); - for (MSentryGroup mGroup : groups) { - for (MSentryRole role : mGroup.getRoles()) { - LinkedList<String> rUpdate = retVal.get(role.getRoleName()); - if (rUpdate == null) { - rUpdate = new LinkedList<String>(); - retVal.put(role.getRoleName(), rUpdate); - } - rUpdate.add(mGroup.getGroupName()); - } - } - commitTransaction(pm); - return retVal; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, LinkedList<String>>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Map<String, LinkedList<String>> retVal = new HashMap<>(); + Query query = pm.newQuery(MSentryGroup.class); + List<MSentryGroup> groups = (List<MSentryGroup>) query.execute(); + for (MSentryGroup mGroup : groups) { + for (MSentryRole role : mGroup.getRoles()) { + LinkedList<String> rUpdate = retVal.get(role.getRoleName()); + if (rUpdate == null) { + rUpdate = new LinkedList<>(); + retVal.put(role.getRoleName(), rUpdate); + } + rUpdate.add(mGroup.getGroupName()); + } + } + return retVal; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } /** * This returns a Mapping of Authz -> [Paths] */ public Map<String, Set<String>> retrieveFullPathsImage() { - Map<String, Set<String>> retVal = new HashMap<>(); - boolean rollbackTransaction = true; - PersistenceManager pm = null; - + Map<String, Set<String>> result = new HashMap<>(); try { - pm = openTransaction(); - Query query = pm.newQuery(MAuthzPathsMapping.class); - List<MAuthzPathsMapping> authzToPathsMappings = (List<MAuthzPathsMapping>) query.execute(); - for (MAuthzPathsMapping authzToPaths : authzToPathsMappings) { - retVal.put(authzToPaths.getAuthzObjName(), authzToPaths.getPaths()); - } - - rollbackTransaction = false; - commitTransaction(pm); - return retVal; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, Set<String>>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Map<String, Set<String>> retVal = new HashMap<>(); + Query query = pm.newQuery(MAuthzPathsMapping.class); + List<MAuthzPathsMapping> authzToPathsMappings = (List<MAuthzPathsMapping>) query.execute(); + for (MAuthzPathsMapping authzToPaths : authzToPathsMappings) { + retVal.put(authzToPaths.getAuthzObjName(), authzToPaths.getPaths()); + } + return retVal; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } - public CommitContext createAuthzPathsMapping(String hiveObj, - Set<String> paths) throws SentryNoSuchObjectException, - SentryAlreadyExistsException { - - boolean rollbackTransaction = true; - PersistenceManager pm = null; - - try { - pm = openTransaction(); - createAuthzPathsMappingCore(pm, hiveObj, paths); - CommitContext commit = commitUpdateTransaction(pm); - rollbackTransaction = false; - return commit; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + public CommitContext createAuthzPathsMapping(final String hiveObj, + final Set<String> paths) throws Exception { + return (CommitContext)tm.executeTransactionWithRetry( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + createAuthzPathsMappingCore(pm, hiveObj, paths); + return new CommitContext(SERVER_UUID, incrementGetSequenceId()); + } + }); } private void createAuthzPathsMappingCore(PersistenceManager pm, String authzObj, @@ -2253,37 +2118,37 @@ public class SentryStore { } // get mapping datas for [group,role], [user,role] with the specific roles - public List<Map<String, Set<String>>> getGroupUserRoleMapList(Set<String> roleNames) { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + public List<Map<String, Set<String>>> getGroupUserRoleMapList(final Set<String> roleNames) { + List<Map<String, Set<String>>> result = new ArrayList<>(); try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryRole.class); - - List<String> rolesFiler = new LinkedList<String>(); - if (roleNames != null) { - for (String rName : roleNames) { - rolesFiler.add("(roleName == \"" + rName.trim().toLowerCase() + "\")"); - } - } - if (rolesFiler.size() > 0) { - query.setFilter(Joiner.on(" || ").join(rolesFiler)); - } + result = (List<Map<String, Set<String>>>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryRole.class); + + List<String> rolesFiler = new LinkedList<String>(); + if (roleNames != null) { + for (String rName : roleNames) { + rolesFiler.add("(roleName == \"" + rName.trim().toLowerCase() + "\")"); + } + } + if (rolesFiler.size() > 0) { + query.setFilter(Joiner.on(" || ").join(rolesFiler)); + } - List<MSentryRole> mSentryRoles = (List<MSentryRole>) query.execute(); - Map<String, Set<String>> groupRolesMap = getGroupRolesMap(mSentryRoles); - Map<String, Set<String>> userRolesMap = getUserRolesMap(mSentryRoles); - List<Map<String, Set<String>>> mapsList = new ArrayList<>(); - mapsList.add(INDEX_GROUP_ROLES_MAP, groupRolesMap); - mapsList.add(INDEX_USER_ROLES_MAP, userRolesMap); - commitTransaction(pm); - rollbackTransaction = false; - return mapsList; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + List<MSentryRole> mSentryRoles = (List<MSentryRole>) query.execute(); + Map<String, Set<String>> groupRolesMap = getGroupRolesMap(mSentryRoles); + Map<String, Set<String>> userRolesMap = getUserRolesMap(mSentryRoles); + List<Map<String, Set<String>>> mapsList = new ArrayList<>(); + mapsList.add(INDEX_GROUP_ROLES_MAP, groupRolesMap); + mapsList.add(INDEX_USER_ROLES_MAP, userRolesMap); + return mapsList; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } private Map<String, Set<String>> getGroupRolesMap(List<MSentryRole> mSentryRoles) { @@ -2334,36 +2199,28 @@ public class SentryStore { } // get mapping data for [role,privilege] with the specific auth object - public Map<String, Set<TSentryPrivilege>> getRoleNameTPrivilegesMap(String dbName, - String tableName) throws Exception { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - try { - pm = openTransaction(); - Query query = pm.newQuery(MSentryPrivilege.class); - - List<String> privilegeFiler = new LinkedList<String>(); - if (!StringUtils.isEmpty(dbName)) { - privilegeFiler.add("(dbName == \"" + dbName.trim().toLowerCase() + "\") "); - } - if (!StringUtils.isEmpty(tableName)) { - privilegeFiler.add("(tableName == \"" + tableName.trim().toLowerCase() + "\") "); - } - if (privilegeFiler.size() > 0) { - query.setFilter(Joiner.on(" && ").join(privilegeFiler)); - } + public Map<String, Set<TSentryPrivilege>> getRoleNameTPrivilegesMap(final String dbName, + final String tableName) throws Exception { + return (Map<String, Set<TSentryPrivilege>>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPrivilege.class); + + List<String> privilegeFiler = new LinkedList<String>(); + if (!StringUtils.isEmpty(dbName)) { + privilegeFiler.add("(dbName == \"" + dbName.trim().toLowerCase() + "\") "); + } + if (!StringUtils.isEmpty(tableName)) { + privilegeFiler.add("(tableName == \"" + tableName.trim().toLowerCase() + "\") "); + } + if (privilegeFiler.size() > 0) { + query.setFilter(Joiner.on(" && ").join(privilegeFiler)); + } - List<MSentryPrivilege> mSentryPrivileges = (List<MSentryPrivilege>) query.execute(); - Map<String, Set<TSentryPrivilege>> rolePrivilegesMap = - getRolePrivilegesMap(mSentryPrivileges); - commitTransaction(pm); - rollbackTransaction = false; - return rolePrivilegesMap; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + List<MSentryPrivilege> mSentryPrivileges = (List<MSentryPrivilege>) query.execute(); + return getRolePrivilegesMap(mSentryPrivileges); + } + }); } private Map<String, Set<TSentryPrivilege>> getRolePrivilegesMap( @@ -2391,24 +2248,18 @@ public class SentryStore { // Get the all exist role names, will return an empty set // if no role names exist. public Set<String> getAllRoleNames() { - - boolean rollbackTransaction = true; - PersistenceManager pm = null; - + Set<String> result = new HashSet<>(); try { - pm = openTransaction(); - - Set<String> existRoleNames = getAllRoleNames(pm); - - commitTransaction(pm); - rollbackTransaction = false; - - return existRoleNames; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Set<String>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getAllRoleNames(pm); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } // get the all exist role names @@ -2464,80 +2315,75 @@ public class SentryStore { @VisibleForTesting protected Map<String, MSentryRole> getRolesMap() { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + Map<String, MSentryRole> result = new HashMap<>(); try { - pm = openTransaction(); - - Query query = pm.newQuery(MSentryRole.class); - List<MSentryRole> mSentryRoles = (List<MSentryRole>) query.execute(); - Map<String, MSentryRole> existRolesMap = Maps.newHashMap(); - if (mSentryRoles != null) { - // change the List<MSentryRole> -> Map<roleName, Set<MSentryRole>> - for (MSentryRole mSentryRole : mSentryRoles) { - existRolesMap.put(mSentryRole.getRoleName(), mSentryRole); - } - } - - commitTransaction(pm); - rollbackTransaction = false; - return existRolesMap; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, MSentryRole>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryRole.class); + List<MSentryRole> mSentryRoles = (List<MSentryRole>) query.execute(); + Map<String, MSentryRole> existRolesMap = Maps.newHashMap(); + if (mSentryRoles != null) { + // change the List<MSentryRole> -> Map<roleName, Set<MSentryRole>> + for (MSentryRole mSentryRole : mSentryRoles) { + existRolesMap.put(mSentryRole.getRoleName(), mSentryRole); + } + } + return existRolesMap; + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } @VisibleForTesting protected Map<String, MSentryGroup> getGroupNameToGroupMap() { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + Map<String, MSentryGroup>result = new HashMap<>(); try { - pm = openTransaction(); - Map<String, MSentryGroup> resultMap = getGroupNameTGroupMap(pm); - commitTransaction(pm); - rollbackTransaction = false; - return resultMap; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, MSentryGroup>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getGroupNameTGroupMap(pm); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } @VisibleForTesting protected Map<String, MSentryUser> getUserNameToUserMap() { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + Map<String, MSentryUser> result = new HashMap<>(); try { - pm = openTransaction(); - Map<String, MSentryUser> resultMap = getUserNameToUserMap(pm); - commitTransaction(pm); - rollbackTransaction = false; - return resultMap; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (Map<String, MSentryUser>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getUserNameToUserMap(pm); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } @VisibleForTesting protected List<MSentryPrivilege> getPrivilegesList() { - boolean rollbackTransaction = true; - PersistenceManager pm = null; + List<MSentryPrivilege> result = new ArrayList<>(); try { - pm = openTransaction(); - List<MSentryPrivilege> resultList = getPrivilegesList(pm); - commitTransaction(pm); - rollbackTransaction = false; - return resultList; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } + result = (List<MSentryPrivilege>) tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + return getPrivilegesList(pm); + } + }); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } + return result; } /** @@ -2576,42 +2422,34 @@ public class SentryStore { * The option for merging or overwriting the existing data during import, true for * overwriting, false for merging */ - public void importSentryMetaData(TSentryMappingData tSentryMappingData, boolean isOverwriteForRole) - throws Exception { - boolean rollbackTransaction = true; - PersistenceManager pm = null; - // change all role name in lowercase - TSentryMappingData mappingData = lowercaseRoleName(tSentryMappingData); - try { - pm = openTransaction(); - Set<String> existRoleNames = getAllRoleNames(pm); - // - Map<String, Set<TSentryGroup>> importedRoleGroupsMap = covertToRoleNameTGroupsMap(mappingData - .getGroupRolesMap()); - Map<String, Set<String>> importedRoleUsersMap = covertToRoleUsersMap(mappingData - .getUserRolesMap()); - Set<String> importedRoleNames = importedRoleGroupsMap.keySet(); - // if import with overwrite role, drop the duplicated roles in current DB first. - if (isOverwriteForRole) { - dropDuplicatedRoleForImport(pm, existRoleNames, importedRoleNames); - // refresh the existRoleNames for the drop role - existRoleNames = getAllRoleNames(pm); - } + public void importSentryMetaData(final TSentryMappingData tSentryMappingData, + final boolean isOverwriteForRole) throws Exception { + tm.executeTransaction( + new TransactionBlock() { + public Object execute(PersistenceManager pm) throws Exception { + TSentryMappingData mappingData = lowercaseRoleName(tSentryMappingData); + Set<String> existRoleNames = getAllRoleNames(pm); + Map<String, Set<TSentryGroup>> importedRoleGroupsMap = covertToRoleNameTGroupsMap(mappingData + .getGroupRolesMap()); + Map<String, Set<String>> importedRoleUsersMap = covertToRoleUsersMap(mappingData + .getUserRolesMap()); + Set<String> importedRoleNames = importedRoleGroupsMap.keySet(); + // if import with overwrite role, drop the duplicated roles in current DB first. + if (isOverwriteForRole) { + dropDuplicatedRoleForImport(pm, existRoleNames, importedRoleNames); + // refresh the existRoleNames for the drop role + existRoleNames = getAllRoleNames(pm); + } - // import the mapping data for [role,privilege], the existRoleNames will be updated - importRolePrivilegeMapping(pm, existRoleNames, mappingData.getRolePrivilegesMap()); - // import the mapping data for [role,group], the existRoleNames will be updated - importRoleGroupMapping(pm, existRoleNames, importedRoleGroupsMap); - // import the mapping data for [role,user], the existRoleNames will be updated - importRoleUserMapping(pm, existRoleNames, importedRoleUsersMap); - - commitTransaction(pm); - rollbackTransaction = false; - } finally { - if (rollbackTransaction) { - rollbackTransaction(pm); - } - } + // import the mapping data for [role,privilege], the existRoleNames will be updated + importRolePrivilegeMapping(pm, existRoleNames, mappingData.getRolePrivilegesMap()); + // import the mapping data for [role,group], the existRoleNames will be updated + importRoleGroupMapping(pm, existRoleNames, importedRoleGroupsMap); + // import the mapping data for [role,user], the existRoleNames will be updated + importRoleUserMapping(pm, existRoleNames, importedRoleUsersMap); + return null; + } + }); } // covert the Map[group->roles] to Map[role->groups]
http://git-wip-us.apache.org/repos/asf/sentry/blob/13c3305d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java new file mode 100644 index 0000000..76d004b --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import javax.jdo.PersistenceManager; + +/** + * TransactionBlock wraps the code that is executed inside a single + * transaction + */ +public interface TransactionBlock { + /** + * Execute some code as a single transaction, the code should not start new transaction + * or manipulate transactions with the PersistenceManager. TransactionManager is responsible to + * handle the transaction management. + * @param pm PersistenceManager for the current transaction + * @return Object with the result of execute() + * @throws Exception + */ + Object execute(PersistenceManager pm) throws Exception; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/13c3305d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java new file mode 100644 index 0000000..9d01746 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.service.thrift.ServiceConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.Transaction; + +/** + * TransactionManager is used for executing the database transaction, it supports + * the transaction with retry mechanism for the unexpected exceptions, except SentryUserExceptions, + * eg, SentryNoSuchObjectException, SentryAlreadyExistsException etc. + */ +public class TransactionManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TransactionManager.class); + + final private PersistenceManagerFactory pmf; + + // Maximum number of retries per call + final private int transactionRetryMax; + + // Delay (in milliseconds) between retries + final private int retryWaitTimeMills; + + public TransactionManager(PersistenceManagerFactory pmf, Configuration conf) { + this.pmf = pmf; + this.transactionRetryMax = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, + ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT); + this.retryWaitTimeMills = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS, + ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT); + } + + + /** + * Execute some code as a single transaction, the code in tb.execute() should not start new + * transaction or manipulate transactions with the PersistenceManager. + * @param tb transaction block with code to execute + * @return Object with the result of tb.execute() + * @throws Exception + */ + public Object executeTransaction(TransactionBlock tb) throws Exception { + try (CloseablePersistenceManager cpm = + new CloseablePersistenceManager(pmf.getPersistenceManager())) { + Transaction transaction = cpm.pm.currentTransaction(); + transaction.begin(); + try { + Object result = tb.execute(cpm.pm); + transaction.commit(); + return result; + } finally { + if (transaction.isActive()) { + transaction.rollback(); + } + } + } + } + + /** + * Execute some code as a single transaction with retry mechanism + * @param tb transaction block with code to execute + * @return Object with the result of tb.execute() + * @throws Exception + */ + public Object executeTransactionWithRetry(TransactionBlock tb) throws Exception { + int retryNum = 0; + while (retryNum < transactionRetryMax) { + try { + return executeTransaction(tb); + } catch (Exception e) { + // throw the sentry exception without retry + if (e instanceof SentryUserException) { + throw e; + } + retryNum++; + if (retryNum >= transactionRetryMax) { + String message = "The transaction has reached max retry number, will not retry again."; + LOGGER.error(message, e); + throw new Exception(message, e); + } + LOGGER.warn("Exception is thrown, retry the transaction, current retry num is:" + + retryNum + ", the max retry num is: " + transactionRetryMax, e); + try { + Thread.sleep(retryWaitTimeMills); + } catch (InterruptedException ex) { + throw ex; + } + } + } + return null; + } + + /** + * CloseablePersistenceManager is a wrapper around PersistenceManager that + * implements AutoCloseable interface. It is needed because Apache jdo doesn't + * implement AutoCloseable (Datanucleus version does). + */ + private class CloseablePersistenceManager implements AutoCloseable { + private final PersistenceManager pm; + + public CloseablePersistenceManager(PersistenceManager pm) { + this.pm = pm; + } + + @Override + public void close() throws Exception { + pm.close(); + } + } +} \ No newline at end of file
