TEPHRA-223 Encapsulate the two data structures used for invalid transactions to avoid update issues
This closes #37 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/872fb109 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/872fb109 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/872fb109 Branch: refs/heads/master Commit: 872fb1090efadb3d62a75fa8f51b308511eea754 Parents: 10e36e6 Author: poorna <[email protected]> Authored: Mon Feb 20 17:13:39 2017 -0800 Committer: poorna <[email protected]> Committed: Tue Feb 21 17:46:33 2017 -0800 ---------------------------------------------------------------------- .../org/apache/tephra/TransactionManager.java | 104 +++++++-------- .../apache/tephra/manager/InvalidTxList.java | 126 +++++++++++++++++++ .../tephra/manager/InvalidTxListTest.java | 110 ++++++++++++++++ .../AbstractTransactionStateStorageTest.java | 10 ++ 4 files changed, 294 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index 0b90d7f..f2060cd 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -29,7 +29,11 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractService; import com.google.inject.Inject; import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongArraySet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongSet; import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.manager.InvalidTxList; import org.apache.tephra.metrics.DefaultMetricsCollector; import org.apache.tephra.metrics.MetricsCollector; import org.apache.tephra.persist.NoOpTransactionStateStorage; @@ -46,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -115,15 +118,11 @@ public class TransactionManager extends AbstractService { //poll every 10 second to emit metrics private static final long METRICS_POLL_INTERVAL = 10000L; - private static final long[] NO_INVALID_TX = { }; - // Transactions that are in progress, with their info. private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>(); // the list of transactions that are invalid (not properly committed/aborted, or timed out) - // TODO: explain usage of two arrays - private final LongArrayList invalid = new LongArrayList(); - private long[] invalidArray = NO_INVALID_TX; + private final InvalidTxList invalidTxList = new InvalidTxList(); // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil) // todo: should this be consolidated with inProgress? @@ -198,8 +197,7 @@ public class TransactionManager extends AbstractService { } private void clear() { - invalid.clear(); - invalidArray = NO_INVALID_TX; + invalidTxList.clear(); inProgress.clear(); committedChangeSets.clear(); committingChangeSets.clear(); @@ -318,7 +316,7 @@ public class TransactionManager extends AbstractService { txMetricsCollector.gauge("committing.size", committingChangeSets.size()); txMetricsCollector.gauge("committed.size", committedChangeSets.size()); txMetricsCollector.gauge("inprogress.size", inProgress.size()); - txMetricsCollector.gauge("invalid.size", invalidArray.length); + txMetricsCollector.gauge("invalid.size", getInvalidSize()); } @Override @@ -327,7 +325,7 @@ public class TransactionManager extends AbstractService { txMetricsCollector.gauge("committing.size", committingChangeSets.size()); txMetricsCollector.gauge("committed.size", committedChangeSets.size()); txMetricsCollector.gauge("inprogress.size", inProgress.size()); - txMetricsCollector.gauge("invalid.size", invalidArray.length); + txMetricsCollector.gauge("invalid.size", getInvalidSize()); } @Override @@ -363,7 +361,7 @@ public class TransactionManager extends AbstractService { } if (!timedOut.isEmpty()) { invalidEdits = Lists.newArrayListWithCapacity(timedOut.size()); - invalid.addAll(timedOut.keySet()); + invalidTxList.addAll(timedOut.keySet()); for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) { inProgress.remove(tx.getKey()); // checkpoints never go into the committing change sets or the edits @@ -373,9 +371,6 @@ public class TransactionManager extends AbstractService { } } - // todo: find a more efficient way to keep this sorted. Could it just be an array? - Collections.sort(invalid); - invalidArray = invalid.toLongArray(); LOG.info("Invalidated {} transactions due to timeout.", timedOut.size()); } } @@ -468,7 +463,8 @@ public class TransactionManager extends AbstractService { public synchronized TransactionSnapshot getCurrentState() { return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer, - invalid, inProgress, committingChangeSets, committedChangeSets); + invalidTxList.toRawList(), inProgress, committingChangeSets, + committedChangeSets); } public synchronized void recoverState() { @@ -497,7 +493,7 @@ public class TransactionManager extends AbstractService { Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!"); Preconditions.checkState(readPointer == 0, "readPointer has been set!"); Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!"); - Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!"); + Preconditions.checkState(invalidTxList.isEmpty(), "invalid list should be empty!"); Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!"); Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should be empty!"); Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be empty!"); @@ -506,7 +502,7 @@ public class TransactionManager extends AbstractService { lastSnapshotTime = snapshot.getTimestamp(); readPointer = snapshot.getReadPointer(); lastWritePointer = snapshot.getWritePointer(); - invalid.addAll(snapshot.getInvalid()); + invalidTxList.addAll(snapshot.getInvalid()); inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress())); committingChangeSets.putAll(snapshot.getCommittingChangeSets()); committedChangeSets.putAll(snapshot.getCommittedChangeSets()); @@ -818,14 +814,17 @@ public class TransactionManager extends AbstractService { txMetricsCollector.rate("canCommit"); Stopwatch timer = new Stopwatch().start(); if (inProgress.get(tx.getTransactionId()) == null) { - // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalid.contains(tx.getTransactionId())) { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", - tx.getTransactionId())); - } else { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + synchronized (this) { + // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. + if (invalidTxList.contains(tx.getTransactionId())) { + throw new TransactionNotInProgressException( + String.format( + "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", + tx.getTransactionId())); + } else { + throw new TransactionNotInProgressException( + String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + } } } @@ -872,7 +871,7 @@ public class TransactionManager extends AbstractService { commitPointer = lastWritePointer + 1; if (inProgress.get(tx.getTransactionId()) == null) { // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalid.contains(tx.getTransactionId())) { + if (invalidTxList.contains(tx.getTransactionId())) { throw new TransactionNotInProgressException( String.format("canCommit() is called for transaction %d that is not in progress " + "(it is known to be invalid)", tx.getTransactionId())); @@ -928,8 +927,7 @@ public class TransactionManager extends AbstractService { InProgressTx previous = inProgress.remove(transactionId); if (previous == null) { // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. - if (invalid.rem(transactionId)) { - invalidArray = invalid.toLongArray(); + if (invalidTxList.remove(transactionId)) { LOG.info("Tx invalid list: removed committed tx {}", transactionId); } } else { @@ -983,17 +981,16 @@ public class TransactionManager extends AbstractService { boolean removeInProgressCheckpoints = true; if (removed == null) { // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. - if (invalid.rem(writePointer)) { + if (invalidTxList.remove(writePointer)) { // the tx and all its children were invalidated: no need to remove them from inProgress removeInProgressCheckpoints = false; // remove any invalidated checkpoint pointers // this will only be present if the parent write pointer was also invalidated if (checkpointWritePointers != null) { for (long checkpointWritePointer : checkpointWritePointers) { - invalid.rem(checkpointWritePointer); + invalidTxList.remove(checkpointWritePointer); } } - invalidArray = invalid.toLongArray(); LOG.info("Tx invalid list: removed aborted tx {}", writePointer); } } @@ -1032,21 +1029,18 @@ public class TransactionManager extends AbstractService { // This check is to prevent from invalidating committed transactions if (previous != null || previousChangeSet != null) { // add tx to invalids - invalid.add(writePointer); + invalidTxList.add(writePointer); if (previous == null) { LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer); } else { // invalidate any checkpoint write pointers LongArrayList childWritePointers = previous.getCheckpointWritePointers(); if (!childWritePointers.isEmpty()) { - invalid.addAll(childWritePointers); + invalidTxList.addAll(childWritePointers); inProgress.keySet().removeAll(childWritePointers); } } LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer); - // todo: find a more efficient way to keep this sorted. Could it just be an array? - Collections.sort(invalid); - invalidArray = invalid.toLongArray(); if (previous != null && !previous.isLongRunning()) { // tx was short-running: must move read pointer moveReadPointerIfNeeded(writePointer); @@ -1080,13 +1074,9 @@ public class TransactionManager extends AbstractService { } } - private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) { - LOG.info("Removing tx ids {} from invalid list", invalidTxIds); - boolean success = invalid.removeAll(invalidTxIds); - if (success) { - invalidArray = invalid.toLongArray(); - } - return success; + private boolean doTruncateInvalidTx(Set<Long> toRemove) { + LOG.info("Removing tx ids {} from invalid list", toRemove); + return invalidTxList.removeAll(toRemove); } /** @@ -1123,15 +1113,16 @@ public class TransactionManager extends AbstractService { } // Find all invalid transactions earlier than truncateWp - Set<Long> toTruncate = Sets.newHashSet(); - for (long wp : invalid) { - // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp - if (wp >= truncateWp) { - break; + LongSet toTruncate = new LongArraySet(); + LongIterator it = invalidTxList.toRawList().iterator(); + while (it.hasNext()) { + long wp = it.nextLong(); + if (wp < truncateWp) { + toTruncate.add(wp); } - toTruncate.add(wp); } - return doTruncateInvalidTx(toTruncate); + LOG.info("Removing tx ids {} from invalid list", toTruncate); + return invalidTxList.removeAll(toTruncate); } public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException { @@ -1149,7 +1140,7 @@ public class TransactionManager extends AbstractService { // check that the parent tx is in progress InProgressTx parentTx = inProgress.get(txId); if (parentTx == null) { - if (invalid.contains(txId)) { + if (invalidTxList.contains(txId)) { throw new TransactionNotInProgressException( String.format("Transaction %d is not in progress because it was invalidated", txId)); } else { @@ -1184,14 +1175,14 @@ public class TransactionManager extends AbstractService { // hack for exposing important metric public int getExcludedListSize() { - return invalid.size() + inProgress.size(); + return getInvalidSize() + inProgress.size(); } /** * @return the size of invalid list */ - public int getInvalidSize() { - return this.invalid.size(); + public synchronized int getInvalidSize() { + return this.invalidTxList.size(); } int getCommittedSize() { @@ -1254,7 +1245,8 @@ public class TransactionManager extends AbstractService { firstShortTx = txId; } } - return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type); + return new Transaction(readPointer, writePointer, invalidTxList.toSortedArray(), + inProgressIds.toLongArray(), firstShortTx, type); } private void appendToLog(TransactionEdit edit) { @@ -1285,7 +1277,7 @@ public class TransactionManager extends AbstractService { */ public void logStatistics() { LOG.info("Transaction Statistics: write pointer = " + lastWritePointer + - ", invalid = " + invalid.size() + + ", invalid = " + getInvalidSize() + ", in progress = " + inProgress.size() + ", committing = " + committingChangeSets.size() + ", committed = " + committedChangeSets.size()); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java new file mode 100644 index 0000000..231196c --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.manager; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongList; +import it.unimi.dsi.fastutil.longs.LongLists; +import org.apache.tephra.TransactionManager; + +import java.util.Arrays; +import java.util.Collection; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * This is an internal class used by the {@link TransactionManager} to store invalid transaction ids. + * This class uses both a list and an array to keep track of the invalid ids. The list is the primary + * data structure for storing the invalid ids. The array is populated lazily on changes to the list. + * The array is used to avoid creating a new array every time method {@link #toSortedArray()} is invoked. + * + * This class is not thread safe and relies on external synchronization. TransactionManager always + * accesses an instance of this class after synchronization. + */ +@NotThreadSafe +public class InvalidTxList { + private static final long[] NO_INVALID_TX = { }; + + private final LongList invalid = new LongArrayList(); + private long[] invalidArray = NO_INVALID_TX; + + private boolean dirty = false; // used to track changes to the invalid list + + public int size() { + return invalid.size(); + } + + public boolean isEmpty() { + return invalid.isEmpty(); + } + + public boolean add(long id) { + boolean changed = invalid.add(id); + dirty = dirty || changed; + return changed; + } + + public boolean addAll(Collection<? extends Long> ids) { + boolean changed = invalid.addAll(ids); + dirty = dirty || changed; + return changed; + } + + public boolean addAll(LongList ids) { + boolean changed = invalid.addAll(ids); + dirty = dirty || changed; + return changed; + } + + public boolean contains(long id) { + return invalid.contains(id); + } + + public boolean remove(long id) { + boolean changed = invalid.rem(id); + dirty = dirty || changed; + return changed; + } + + public boolean removeAll(Collection<? extends Long> ids) { + boolean changed = invalid.removeAll(ids); + dirty = dirty || changed; + return changed; + } + + @SuppressWarnings("WeakerAccess") + public boolean removeAll(LongList ids) { + boolean changed = invalid.removeAll(ids); + dirty = dirty || changed; + return changed; + } + + public void clear() { + invalid.clear(); + invalidArray = NO_INVALID_TX; + dirty = false; + } + + /** + * @return sorted array of invalid transactions + */ + public long[] toSortedArray() { + lazyUpdate(); + return invalidArray; + } + + /** + * @return list of invalid transactions. The list is not sorted. + */ + public LongList toRawList() { + return LongLists.unmodifiable(invalid); + } + + private void lazyUpdate() { + if (dirty) { + invalidArray = invalid.toLongArray(); + Arrays.sort(invalidArray); + dirty = false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java new file mode 100644 index 0000000..4f45072 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/manager/InvalidTxListTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.manager; + +import com.google.common.collect.ImmutableList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class InvalidTxListTest { + + @Test + public void testInvalidTxList() { + InvalidTxList invalidTxList = new InvalidTxList(); + // Assert that the list is empty at the beginning + Assert.assertTrue(invalidTxList.isEmpty()); + Assert.assertEquals(0, invalidTxList.size()); + Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList()); + Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray()); + + // Try removing something from the empty list + Assert.assertFalse(invalidTxList.remove(3)); + Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(5L, 9L))); + + // verify contains + Assert.assertFalse(invalidTxList.contains(3)); + + // Add some elements to the list + invalidTxList.add(3); + invalidTxList.add(1); + invalidTxList.add(8); + invalidTxList.add(5); + + // verify contains + Assert.assertTrue(invalidTxList.contains(3)); + + // Assert the newly added elements + Assert.assertFalse(invalidTxList.isEmpty()); + Assert.assertEquals(4, invalidTxList.size()); + Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L), invalidTxList.toRawList()); + Assert.assertArrayEquals(new long[] {1, 3, 5, 8}, invalidTxList.toSortedArray()); + + // Add a collection of elements + invalidTxList.addAll(ImmutableList.of(7L, 10L, 4L, 2L)); + + // Assert the newly added elements + Assert.assertFalse(invalidTxList.isEmpty()); + Assert.assertEquals(8, invalidTxList.size()); + Assert.assertEquals(ImmutableList.of(3L, 1L, 8L, 5L, 7L, 10L, 4L, 2L), invalidTxList.toRawList()); + Assert.assertArrayEquals(new long[] {1, 2, 3, 4, 5, 7, 8, 10}, invalidTxList.toSortedArray()); + + // Remove elements that are not present + Assert.assertFalse(invalidTxList.remove(6)); + Assert.assertFalse(invalidTxList.removeAll(ImmutableList.of(9L, 11L))); + + // Remove a collection of elements + Assert.assertTrue(invalidTxList.removeAll(ImmutableList.of(8L, 4L, 2L))); + // This time check the array first and then check the list + Assert.assertArrayEquals(new long[] {1, 3, 5, 7, 10}, invalidTxList.toSortedArray()); + Assert.assertEquals(ImmutableList.of(3L, 1L, 5L, 7L, 10L), invalidTxList.toRawList()); + + // Remove a single element + Assert.assertTrue(invalidTxList.remove(5)); + Assert.assertArrayEquals(new long[] {1, 3, 7, 10}, invalidTxList.toSortedArray()); + Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L), invalidTxList.toRawList()); + + // Add a LongCollection + invalidTxList.addAll(new LongArrayList(new long[] {15, 12, 13})); + + // Assert the newly added elements + Assert.assertEquals(7, invalidTxList.size()); + Assert.assertArrayEquals(new long[] {1, 3, 7, 10, 12, 13, 15}, invalidTxList.toSortedArray()); + Assert.assertEquals(ImmutableList.of(3L, 1L, 7L, 10L, 15L, 12L, 13L), invalidTxList.toRawList()); + + // Remove a LongCollection + invalidTxList.removeAll(new LongArrayList(new long[] {3, 7, 12})); + + // Assert removals + Assert.assertEquals(4, invalidTxList.size()); + Assert.assertArrayEquals(new long[] {1, 10, 13, 15}, invalidTxList.toSortedArray()); + Assert.assertEquals(ImmutableList.of(1L, 10L, 15L, 13L), invalidTxList.toRawList()); + + // Clear the list + invalidTxList.clear(); + Assert.assertTrue(invalidTxList.isEmpty()); + Assert.assertEquals(0, invalidTxList.size()); + Assert.assertArrayEquals(new long[0], invalidTxList.toSortedArray()); + Assert.assertEquals(ImmutableList.of(), invalidTxList.toRawList()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/872fb109/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java index 21090c5..ec06528 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.ChangeId; @@ -137,6 +138,9 @@ public abstract class AbstractTransactionStateStorageTest { // TODO: replace with new persistence tests final byte[] a = { 'a' }; final byte[] b = { 'b' }; + // Start and invalidate a transaction + Transaction invalid = txManager.startShort(); + txManager.invalidate(invalid.getTransactionId()); // start a tx1, add a change A and commit Transaction tx1 = txManager.startShort(); Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); @@ -162,6 +166,12 @@ public abstract class AbstractTransactionStateStorageTest { LOG.info("New state: " + newState); assertEquals(origState, newState); + // Verify that the invalid transaction list matches + Transaction checkTx = txManager.startShort(); + Assert.assertEquals(origState.getInvalid(), Longs.asList(checkTx.getInvalids())); + txManager.abort(checkTx); + txManager.abort(invalid); + // commit tx2 Assert.assertTrue(txManager.commit(tx2)); // start another transaction, must be greater than tx3
