Repository: incubator-tephra Updated Branches: refs/heads/master eb6f9e6ac -> 6d8097db2
TEPHRA-235 Ensure that TransactionSnapshot always have a sorted invalid transaction list. This closes #44 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6d8097db Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6d8097db Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6d8097db Branch: refs/heads/master Commit: 6d8097db28e6b5fdf3ec2dc6b97eb85de53f6ce0 Parents: eb6f9e6 Author: Ali Anwar <[email protected]> Authored: Tue Jun 20 18:15:56 2017 -0700 Committer: poorna <[email protected]> Committed: Thu Jun 22 11:32:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/tephra/TransactionManager.java | 2 +- .../apache/tephra/manager/InvalidTxList.java | 2 +- .../tephra/persist/TransactionSnapshot.java | 33 +++++++++++++++++--- .../tephra/snapshot/SnapshotCodecTest.java | 15 +++++++++ .../coprocessor/TransactionProcessorTest.java | 5 ++- .../coprocessor/TransactionProcessorTest.java | 5 ++- .../coprocessor/TransactionProcessorTest.java | 5 ++- .../coprocessor/TransactionProcessorTest.java | 5 ++- .../coprocessor/TransactionProcessorTest.java | 5 ++- 9 files changed, 65 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index 27b6bc6..3f332ad 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -466,7 +466,7 @@ public class TransactionManager extends AbstractService { public synchronized TransactionSnapshot getCurrentState() { return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer, - invalidTxList.toRawList(), inProgress, committingChangeSets, + invalidTxList, inProgress, committingChangeSets, committedChangeSets); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java index 231196c..5d032f6 100644 --- a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java +++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java @@ -110,7 +110,7 @@ public class InvalidTxList { } /** - * @return list of invalid transactions. The list is not sorted. + * @return list of invalid transactions. The list is not guaranteed to be sorted. */ public LongList toRawList() { return LongLists.unmodifiable(invalid); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java index ccf7374..d76a98f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java @@ -19,10 +19,11 @@ package org.apache.tephra.persist; import com.google.common.base.Objects; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.tephra.ChangeId; import org.apache.tephra.TransactionManager; +import org.apache.tephra.manager.InvalidTxList; import java.util.Collection; import java.util.Collections; @@ -44,6 +45,18 @@ public class TransactionSnapshot implements TransactionVisibilityState { private Map<Long, Set<ChangeId>> committingChangeSets; private Map<Long, Set<ChangeId>> committedChangeSets; + /** + * Creates an instance of TransactionSnapshot with the given transaction state + * + * @param timestamp timestamp, in millis, that the snapshot was taken + * @param readPointer current transaction read pointer + * @param writePointer current transaction write pointer + * @param invalid current list of invalid write pointer; must be sorted + * @param inProgress current map of in-progress write pointers to expiration timestamps + * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not + * yet committed + * @param committed current map of write pointers to change sets which have committed + */ public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, NavigableMap<Long, TransactionManager.InProgressTx> inProgress, Map<Long, Set<ChangeId>> committing, Map<Long, Set<ChangeId>> committed) { @@ -52,6 +65,15 @@ public class TransactionSnapshot implements TransactionVisibilityState { this.committedChangeSets = committed; } + /** + * Creates an instance of TransactionSnapshot with the given transaction state + * + * @param timestamp timestamp, in millis, that the snapshot was taken + * @param readPointer current transaction read pointer + * @param writePointer current transaction write pointer + * @param invalid current list of invalid write pointer; must be sorted + * @param inProgress current map of in-progress write pointers to expiration timestamps + */ public TransactionSnapshot(long timestamp, long readPointer, long writePointer, Collection<Long> invalid, NavigableMap<Long, TransactionManager.InProgressTx> inProgress) { this.timestamp = timestamp; @@ -162,9 +184,10 @@ public class TransactionSnapshot implements TransactionVisibilityState { /** * Creates a new {@code TransactionSnapshot} instance with copies of all of the individual collections. + * @param snapshotTime timestamp, in millis, that the snapshot was taken * @param readPointer current transaction read pointer * @param writePointer current transaction write pointer - * @param invalid current list of invalid write pointers + * @param invalidTxList current list of invalid write pointers * @param inProgress current map of in-progress write pointers to expiration timestamps * @param committing current map of write pointers to change sets which have passed {@code canCommit()} but not * yet committed @@ -172,12 +195,12 @@ public class TransactionSnapshot implements TransactionVisibilityState { * @return a new {@code TransactionSnapshot} instance */ public static TransactionSnapshot copyFrom(long snapshotTime, long readPointer, - long writePointer, Collection<Long> invalid, + long writePointer, InvalidTxList invalidTxList, NavigableMap<Long, TransactionManager.InProgressTx> inProgress, Map<Long, Set<ChangeId>> committing, NavigableMap<Long, Set<ChangeId>> committed) { - // copy invalid IDs - Collection<Long> invalidCopy = Lists.newArrayList(invalid); + // copy invalid IDs, after sorting + Collection<Long> invalidCopy = new LongArrayList(invalidTxList.toSortedArray()); // copy in-progress IDs and expirations NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = Maps.newTreeMap(inProgress); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java index f67c58b..18f81c8 100644 --- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -20,8 +20,11 @@ package org.apache.tephra.snapshot; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Guice; import com.google.inject.Injector; @@ -46,6 +49,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -303,6 +307,14 @@ public class SnapshotCodecTest { Transaction transaction = txManager.startLong(); Transaction checkpointTx = txManager.checkpoint(transaction); + // create invalid transactions (invalidated out of order) + Transaction shortTx1 = txManager.startShort(); + Transaction shortTx2 = txManager.startShort(); + Transaction shortTx3 = txManager.startShort(); + txManager.invalidate(shortTx3.getTransactionId()); + txManager.invalidate(shortTx1.getTransactionId()); + txManager.invalidate(shortTx2.getTransactionId()); + // shutdown to force a snapshot txManager.stopAndWait(); @@ -311,6 +323,7 @@ public class SnapshotCodecTest { txStorage.startAndWait(); TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); + Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid()))); TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState(); assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); @@ -339,6 +352,7 @@ public class SnapshotCodecTest { // state should be recovered snapshot = txManager.getCurrentState(); + Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid()))); inProgress = snapshot.getInProgress(); Assert.assertEquals(2, inProgress.size()); @@ -363,6 +377,7 @@ public class SnapshotCodecTest { txStorage2.startAndWait(); snapshot = txStorage2.getLatestSnapshot(); + Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid()))); Assert.assertTrue(snapshot.getInProgress().isEmpty()); txStorage2.stopAndWait(); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index b25ae37..1879116 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -64,6 +64,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; @@ -141,9 +142,11 @@ public class TransactionProcessorTest { conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( - System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index e612e2a..abe375d 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -70,6 +70,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; @@ -147,8 +148,10 @@ public class TransactionProcessorTest { conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( - System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index b92bb09..f6d8e2d 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; @@ -127,8 +128,10 @@ public class TransactionProcessorTest { conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( - System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 4b236fc..8dfce32 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; @@ -127,8 +128,10 @@ public class TransactionProcessorTest { conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( - System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index d21c987..9f7206d 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.TxMetricsCollector; import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.TransactionSnapshot; @@ -127,8 +128,10 @@ public class TransactionProcessorTest { conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( - System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, // this will set visibility upper bound to V[6] Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
