Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 57ded0a9e -> a21a41971
SENTRY-1643: AutoIncrement ChangeID of MSentryPermChange/MSentryPathChange may be error-prone (Lei Xu, reviewed by Hao Hao, Alex Kolbasov, Na Li) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a21a4197 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a21a4197 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a21a4197 Branch: refs/heads/sentry-ha-redesign Commit: a21a41971c229a8f0c2379716bf928cd17fc429f Parents: 57ded0a Author: Alexander Kolbasov <[email protected]> Authored: Tue Apr 11 14:12:35 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Tue Apr 11 14:12:35 2017 -0700 ---------------------------------------------------------------------- .../db/service/model/MSentryPathChange.java | 6 +- .../db/service/model/MSentryPermChange.java | 5 +- .../persistent/DeltaTransactionBlock.java | 6 +- .../db/service/persistent/SentryStore.java | 2 +- .../db/service/persistent/TestSentryStore.java | 67 ++++++++++++++++++++ 5 files changed, 81 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java index a0d3445..4b42ed0 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPathChange.java @@ -67,11 +67,15 @@ public class MSentryPathChange implements MSentryChange { private long createTimeMs; private long notificationID; - public MSentryPathChange(PathsUpdate pathChange) throws TException { + public MSentryPathChange(long changeID, PathsUpdate pathChange) throws TException { // Each PathsUpdate maps to a MSentryPathChange object. // The PathsUpdate is generated from a HMS notification log, // the notification ID is stored as seqNum and // the notification update is serialized as JSON string. + // + // See SENTRY-1643. changeID is set after increasing 1 of the "max(changeID)" fetched from + // the table, to avoid holes between changeIDs. it is subjected to change. + this.changeID = changeID; this.notificationID = pathChange.getSeqNum(); this.pathChange = pathChange.JSONSerialize(); this.createTimeMs = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java index 476fbcb..a97d10a 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryPermChange.java @@ -64,7 +64,10 @@ public class MSentryPermChange implements MSentryChange { private String permChange; private long createTimeMs; - public MSentryPermChange(PermissionsUpdate permChange) throws TException { + public MSentryPermChange(long changeID, PermissionsUpdate permChange) throws TException { + // See SENTRY-1643. changeID is set after increasing 1 of the "max(changeID)" fetched from + // the table, to avoid holes between changeIDs. it is subjected to change. + this.changeID = changeID; this.permChange = permChange.JSONSerialize(); this.createTimeMs = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java index f590a52..8d3c88b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/DeltaTransactionBlock.java @@ -83,9 +83,11 @@ public class DeltaTransactionBlock implements TransactionBlock<Object> { // changeID is trying to be persisted twice, the transaction would // fail. if (update instanceof PermissionsUpdate) { - pm.makePersistent(new MSentryPermChange((PermissionsUpdate)update)); + long lastChangeID = SentryStore.getLastProcessedChangeIDCore(pm, MSentryPermChange.class); + pm.makePersistent(new MSentryPermChange(lastChangeID + 1, (PermissionsUpdate)update)); } else if (update instanceof PathsUpdate) { - pm.makePersistent(new MSentryPathChange((PathsUpdate)update)); + long lastChangeID = SentryStore.getLastProcessedChangeIDCore(pm, MSentryPathChange.class); + pm.makePersistent(new MSentryPathChange(lastChangeID + 1, (PathsUpdate)update)); } else { throw new SentryInvalidInputException("Update should be type of either " + "PermissionsUpdate or PathsUpdate.\n"); http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 802b9c6..19bae55 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -3184,7 +3184,7 @@ public class SentryStore { * @return the last processed changedID for the delta changes. If no * change found then return 0. */ - private <T extends MSentryChange> Long getLastProcessedChangeIDCore( + static <T extends MSentryChange> Long getLastProcessedChangeIDCore( PersistenceManager pm, Class<T> changeCls) { Query query = pm.newQuery(changeCls); query.setResult("max(changeID)"); http://git-wip-us.apache.org/repos/asf/sentry/blob/a21a4197/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index aaa0b9f..fe3880d 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -20,6 +20,11 @@ package org.apache.sentry.provider.db.service.persistent; import java.io.File; import java.util.*; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -57,11 +62,15 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder.newQueryParamBuilder; public class TestSentryStore extends org.junit.Assert { + private static final Logger LOGGER = LoggerFactory.getLogger(TestSentryStore.class); + private static File dataDir; private static SentryStore sentryStore; private static String[] adminGroups = { "adminGroup1" }; @@ -2572,4 +2581,62 @@ public class TestSentryStore extends org.junit.Assert { // TODO: verify MSentryPathChange being purged. // assertEquals(1, sentryStore.getMSentryPathChanges().size()); } + + /** + * This test verifies that in the case of concurrently updating delta change tables, no gap + * between change ID was made. All the change IDs must be consecutive ({@see SENTRY-1643}). + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testConcurrentUpdateChanges() throws Exception { + final int numThreads = 20; + final int numChangesPerThread = 100; + final TransactionManager tm = sentryStore.getTransactionManager(); + final AtomicLong seqNumGenerator = new AtomicLong(0); + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + LOGGER.error("Barrier failed to await", e); + return; + } + for (int j = 0; j < numChangesPerThread; j++) { + List<TransactionBlock<Object>> tbs = new ArrayList<>(); + PermissionsUpdate update = + new PermissionsUpdate(seqNumGenerator.getAndIncrement(), false); + tbs.add(new DeltaTransactionBlock(update)); + try { + tm.executeTransaction(tbs); + } catch (Exception e) { + LOGGER.error("Failed to execute permission update transaction", e); + fail(String.format("Transaction failed: %s", e.getMessage())); + } + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(60, TimeUnit.SECONDS); + + List<MSentryPermChange> changes = sentryStore.getMSentryPermChanges(); + assertEquals(numThreads * numChangesPerThread, changes.size()); + TreeSet<Long> changeIDs = new TreeSet<>(); + for (MSentryPermChange change : changes) { + changeIDs.add(change.getChangeID()); + } + assertEquals("duplicated change ID", numThreads * numChangesPerThread, changeIDs.size()); + long prevId = changeIDs.first() - 1; + for (Long changeId : changeIDs) { + assertTrue(String.format("Found non-consecutive number: prev=%d cur=%d", prevId, changeId), + changeId - prevId == 1); + prevId = changeId; + } + } }
