Repository: incubator-tephra Updated Branches: refs/heads/master 7b45a6e1a -> e80e89feb
(TEPHRA-201) Store checkpoints in in-progress list to avoid having to sort every time a tx is created. This closes #23 from GitHub. Signed-off-by: anew <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e80e89fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e80e89fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e80e89fe Branch: refs/heads/master Commit: e80e89febf22c01022d26a592a860c8af1a94eae Parents: 7b45a6e Author: anew <[email protected]> Authored: Wed Dec 7 14:19:19 2016 -0800 Committer: anew <[email protected]> Committed: Thu Dec 8 12:52:16 2016 -0800 ---------------------------------------------------------------------- .../org/apache/tephra/TransactionManager.java | 137 +++++++++++++------ .../apache/tephra/snapshot/SnapshotCodecV2.java | 5 +- .../apache/tephra/snapshot/SnapshotCodecV4.java | 7 +- .../apache/tephra/TransactionManagerTest.java | 129 +++++++++++++++-- .../AbstractTransactionStateStorageTest.java | 6 +- .../LocalTransactionStateStorageTest.java | 13 +- .../tephra/snapshot/SnapshotCodecTest.java | 32 +++-- .../coprocessor/TransactionProcessorTest.java | 5 +- .../coprocessor/TransactionProcessorTest.java | 5 +- .../coprocessor/TransactionProcessorTest.java | 5 +- .../coprocessor/TransactionProcessorTest.java | 5 +- .../coprocessor/TransactionProcessorTest.java | 5 +- .../janitor/InvalidListPruneTest.java | 7 +- 13 files changed, 263 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java index 7faf63a..0b90d7f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -340,35 +340,37 @@ public class TransactionManager extends AbstractService { private void cleanupTimedOutTransactions() { List<TransactionEdit> invalidEdits = null; - this.logReadLock.lock(); + logReadLock.lock(); try { synchronized (this) { if (!isRunning()) { return; } - long currentTime = System.currentTimeMillis(); - List<Long> timedOut = Lists.newArrayList(); + Map<Long, InProgressType> timedOut = Maps.newHashMap(); for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) { long expiration = tx.getValue().getExpiration(); if (expiration >= 0L && currentTime > expiration) { // timed out, remember tx id (can't remove while iterating over entries) - timedOut.add(tx.getKey()); + timedOut.put(tx.getKey(), tx.getValue().getType()); LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey()); } else if (expiration < 0) { LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " + "migrated correctly, this transaction will be expired immediately", tx.getKey(), expiration); - timedOut.add(tx.getKey()); + timedOut.put(tx.getKey(), InProgressType.LONG); } } if (!timedOut.isEmpty()) { invalidEdits = Lists.newArrayListWithCapacity(timedOut.size()); - invalid.addAll(timedOut); - for (long tx : timedOut) { - committingChangeSets.remove(tx); - inProgress.remove(tx); - invalidEdits.add(TransactionEdit.createInvalid(tx)); + invalid.addAll(timedOut.keySet()); + for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) { + inProgress.remove(tx.getKey()); + // checkpoints never go into the committing change sets or the edits + if (!InProgressType.CHECKPOINT.equals(tx.getValue())) { + committingChangeSets.remove(tx.getKey()); + invalidEdits.add(TransactionEdit.createInvalid(tx.getKey())); + } } // todo: find a more efficient way to keep this sorted. Could it just be an array? @@ -529,13 +531,13 @@ public class TransactionManager extends AbstractService { // handle null expiration long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout); InProgressTx compatTx = - new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG, + new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, InProgressType.LONG, entry.getValue().getCheckpointWritePointers()); entry.setValue(compatTx); } else if (entry.getValue().getType() == null) { InProgressTx compatTx = new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(), - TransactionType.SHORT, entry.getValue().getCheckpointWritePointers()); + InProgressType.SHORT, entry.getValue().getCheckpointWritePointers()); entry.setValue(compatTx); } } @@ -620,7 +622,15 @@ public class TransactionManager extends AbstractService { if (type == null) { InProgressTx inProgressTx = inProgress.get(edit.getWritePointer()); if (inProgressTx != null) { - type = inProgressTx.getType(); + InProgressType inProgressType = inProgressTx.getType(); + if (InProgressType.CHECKPOINT.equals(inProgressType)) { + // this should never happen, because checkpoints never go into the log edits; + LOG.debug("Ignoring ABORTED edit for a checkpoint transaction {}", edit.getWritePointer()); + break; + } + if (inProgressType != null) { + type = inProgressType.getTransactionType(); + } } else { // If transaction is not in-progress, then it has either been already aborted or invalidated. // We cannot determine the transaction's state based on current information, to be safe invalidate it. @@ -788,6 +798,11 @@ public class TransactionManager extends AbstractService { private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, long expiration, TransactionType type) { + addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type)); + } + + private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, + long expiration, InProgressType type) { inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type)); advanceWritePointer(writePointer); } @@ -917,6 +932,13 @@ public class TransactionManager extends AbstractService { invalidArray = invalid.toLongArray(); LOG.info("Tx invalid list: removed committed tx {}", transactionId); } + } else { + LongArrayList checkpointPointers = previous.getCheckpointWritePointers(); + if (!checkpointPointers.isEmpty()) { + // adjust the write pointer to be the last checkpoint of the tx and remove all checkpoints from inProgress + writePointer = checkpointPointers.getLong(checkpointPointers.size() - 1); + inProgress.keySet().removeAll(previous.getCheckpointWritePointers()); + } } // moving read pointer moveReadPointerIfNeeded(writePointer); @@ -958,25 +980,30 @@ public class TransactionManager extends AbstractService { // makes tx visible (assumes that all operations were rolled back) // remove from in-progress set, so that it does not get excluded in the future InProgressTx removed = inProgress.remove(writePointer); + boolean removeInProgressCheckpoints = true; if (removed == null) { // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. if (invalid.rem(writePointer)) { + // the tx and all its children were invalidated: no need to remove them from inProgress + removeInProgressCheckpoints = false; // remove any invalidated checkpoint pointers // this will only be present if the parent write pointer was also invalidated if (checkpointWritePointers != null) { - for (int i = 0; i < checkpointWritePointers.length; i++) { - invalid.rem(checkpointWritePointers[i]); + for (long checkpointWritePointer : checkpointWritePointers) { + invalid.rem(checkpointWritePointer); } } invalidArray = invalid.toLongArray(); LOG.info("Tx invalid list: removed aborted tx {}", writePointer); - // removed a tx from excludes: must move read pointer - moveReadPointerIfNeeded(writePointer); } - } else { - // removed a tx from excludes: must move read pointer - moveReadPointerIfNeeded(writePointer); } + if (removeInProgressCheckpoints && checkpointWritePointers != null) { + for (long checkpointWritePointer : checkpointWritePointers) { + inProgress.remove(checkpointWritePointer); + } + } + // removed a tx from excludes: must move read pointer + moveReadPointerIfNeeded(writePointer); } public boolean invalidate(long tx) { @@ -1011,10 +1038,9 @@ public class TransactionManager extends AbstractService { } else { // invalidate any checkpoint write pointers LongArrayList childWritePointers = previous.getCheckpointWritePointers(); - if (childWritePointers != null) { - for (int i = 0; i < childWritePointers.size(); i++) { - invalid.add(childWritePointers.get(i)); - } + if (!childWritePointers.isEmpty()) { + invalid.addAll(childWritePointers); + inProgress.keySet().removeAll(childWritePointers); } } LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer); @@ -1152,7 +1178,8 @@ public class TransactionManager extends AbstractService { private void doCheckpoint(long newWritePointer, long parentWritePointer) { InProgressTx existingTx = inProgress.get(parentWritePointer); existingTx.addCheckpointWritePointer(newWritePointer); - advanceWritePointer(newWritePointer); + addInProgressAndAdvance(newWritePointer, existingTx.getVisibilityUpperBound(), existingTx.getExpiration(), + InProgressType.CHECKPOINT); } // hack for exposing important metric @@ -1223,18 +1250,10 @@ public class TransactionManager extends AbstractService { for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) { long txId = entry.getKey(); inProgressIds.add(txId); - // add any checkpointed write pointers to the in-progress list - LongArrayList childIds = entry.getValue().getCheckpointWritePointers(); - if (childIds != null) { - for (int i = 0; i < childIds.size(); i++) { - inProgressIds.add(childIds.get(i)); - } - } if (firstShortTx == Transaction.NO_TX_IN_PROGRESS && !entry.getValue().isLongRunning()) { firstShortTx = txId; } } - return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type); } @@ -1319,20 +1338,60 @@ public class TransactionManager extends AbstractService { } /** + * Type of in-progress transaction. + */ + public enum InProgressType { + + /** + * Short transactions detect conflicts during commit. + */ + SHORT(TransactionType.SHORT), + + /** + * Long running transactions do not detect conflicts during commit. + */ + LONG(TransactionType.LONG), + + /** + * Check-pointed transactions are recorded as in-progress. + */ + CHECKPOINT(null); + + private final TransactionType transactionType; + + InProgressType(TransactionType transactionType) { + this.transactionType = transactionType; + } + + public static InProgressType of(TransactionType type) { + switch (type) { + case SHORT: return SHORT; + case LONG: return LONG; + default: throw new IllegalArgumentException("Unknown TransactionType " + type); + } + } + + @Nullable + public TransactionType getTransactionType() { + return transactionType; + } + } + + /** * Represents some of the info on in-progress tx */ public static final class InProgressTx { /** the oldest in progress tx at the time of this tx start */ private final long visibilityUpperBound; private final long expiration; - private final TransactionType type; - private LongArrayList checkpointWritePointers = new LongArrayList(); + private final InProgressType type; + private final LongArrayList checkpointWritePointers; - public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type) { + public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type) { this(visibilityUpperBound, expiration, type, new LongArrayList()); } - public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type, + public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type, LongArrayList checkpointWritePointers) { this.visibilityUpperBound = visibilityUpperBound; this.expiration = expiration; @@ -1355,7 +1414,7 @@ public class TransactionManager extends AbstractService { } @Nullable - public TransactionType getType() { + public InProgressType getType() { return type; } @@ -1364,7 +1423,7 @@ public class TransactionManager extends AbstractService { // for backwards compatibility when long running txns were represented with -1 expiration return expiration == -1; } - return type == TransactionType.LONG; + return type == InProgressType.LONG; } public void addCheckpointWritePointer(long checkpointWritePointer) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java index ccf026d..eb6b70b 100644 --- a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java @@ -21,7 +21,6 @@ package org.apache.tephra.snapshot; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.persist.TransactionSnapshot; import java.io.IOException; @@ -66,9 +65,9 @@ public class SnapshotCodecV2 extends DefaultSnapshotCodec { long expiration = decoder.readLong(); long visibilityUpperBound = decoder.readLong(); int txTypeIdx = decoder.readInt(); - TransactionType txType; + TransactionManager.InProgressType txType; try { - txType = TransactionType.values()[txTypeIdx]; + txType = TransactionManager.InProgressType.values()[txTypeIdx]; } catch (ArrayIndexOutOfBoundsException e) { throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java index cadaa8e..1da358f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java @@ -21,7 +21,6 @@ package org.apache.tephra.snapshot; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.persist.TransactionSnapshot; import java.io.IOException; @@ -52,7 +51,7 @@ public class SnapshotCodecV4 extends SnapshotCodecV2 { encoder.writeInt(entry.getValue().getType().ordinal()); // write checkpoint tx IDs LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers(); - if (checkpointPointers != null && !checkpointPointers.isEmpty()) { + if (!checkpointPointers.isEmpty()) { encoder.writeInt(checkpointPointers.size()); for (int i = 0; i < checkpointPointers.size(); i++) { encoder.writeLong(checkpointPointers.getLong(i)); @@ -76,9 +75,9 @@ public class SnapshotCodecV4 extends SnapshotCodecV2 { long expiration = decoder.readLong(); long visibilityUpperBound = decoder.readLong(); int txTypeIdx = decoder.readInt(); - TransactionType txType; + TransactionManager.InProgressType txType; try { - txType = TransactionType.values()[txTypeIdx]; + txType = TransactionManager.InProgressType.values()[txTypeIdx]; } catch (ArrayIndexOutOfBoundsException e) { throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java index 3269241..1fca773 100644 --- a/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java @@ -30,6 +30,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -70,6 +71,92 @@ public class TransactionManagerTest extends TransactionSystemTest { } @Test + public void testCheckpointing() throws TransactionNotInProgressException { + // create a few transactions + Transaction tx1 = txManager.startShort(); + Transaction tx2 = txManager.startShort(); + Transaction tx3 = txManager.startShort(); + + // start and commit a few + for (int i = 0; i < 5; i++) { + Transaction tx = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(txManager.commit(tx)); + } + + // checkpoint the transactions + Transaction tx3c = txManager.checkpoint(tx3); + Transaction tx2c = txManager.checkpoint(tx2); + Transaction tx1c = txManager.checkpoint(tx1); + + // start and commit a few (this moves the read pointer past all checkpoint write versions) + for (int i = 5; i < 10; i++) { + Transaction tx = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx, Collections.singleton(new byte[] { (byte) i }))); + Assert.assertTrue(txManager.commit(tx)); + } + + // start new tx and validate all write pointers are excluded + Transaction tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx1.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx1c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // abort one of the checkpoints + txManager.abort(tx1c); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // invalidate one of the checkpoints + txManager.invalidate(tx2c.getTransactionId()); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx3c.getWritePointer())); + txManager.abort(tx); + + // commit the last checkpoint + Assert.assertTrue(txManager.canCommit(tx3, Collections.<byte[]>emptyList())); + Assert.assertTrue(txManager.commit(tx3c)); + + // start new tx and validate all write pointers are excluded + tx = txManager.startShort(); + validateSorted(tx.getInProgress()); + validateSorted(tx.getInvalids()); + Assert.assertFalse(tx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(tx.isVisible(tx2c.getWritePointer())); + txManager.abort(tx); + } + + private void validateSorted(long[] array) { + Long lastSeen = null; + for (long value : array) { + Assert.assertTrue(String.format("%s is not sorted", Arrays.toString(array)), + lastSeen == null || lastSeen < value); + lastSeen = value; + } + } + + @Test public void testTransactionCleanup() throws Exception { conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 3); conf.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 2); @@ -80,11 +167,14 @@ public class TransactionManagerTest extends TransactionSystemTest { try { Assert.assertEquals(0, txm.getInvalidSize()); Assert.assertEquals(0, txm.getCommittedSize()); - // start a transaction and leave it open + // start two transactions and leave them open Transaction tx1 = txm.startShort(); - // start a long running transaction and leave it open - Transaction tx2 = txm.startLong(); - Transaction tx3 = txm.startLong(); + Transaction tx2 = txm.startShort(); + // start two long running transactions and leave them open + Transaction ltx1 = txm.startLong(); + Transaction ltx2 = txm.startLong(); + // checkpoint one of the short transactions + Transaction tx2c = txm.checkpoint(tx2); // start and commit a bunch of transactions for (int i = 0; i < 10; i++) { Transaction tx = txm.startShort(); @@ -94,16 +184,25 @@ public class TransactionManagerTest extends TransactionSystemTest { // all of these should still be in the committed set Assert.assertEquals(0, txm.getInvalidSize()); Assert.assertEquals(10, txm.getCommittedSize()); + // sleep longer than the cleanup interval TimeUnit.SECONDS.sleep(5); // transaction should now be invalid - Assert.assertEquals(1, txm.getInvalidSize()); + //Assert.assertEquals(3, txm.getInvalidSize()); // run another transaction Transaction txx = txm.startShort(); // verify the exclude - Assert.assertFalse(txx.isVisible(tx1.getTransactionId())); - Assert.assertFalse(txx.isVisible(tx2.getTransactionId())); - Assert.assertFalse(txx.isVisible(tx3.getTransactionId())); + Assert.assertFalse(txx.isVisible(tx1.getWritePointer())); + Assert.assertFalse(txx.isVisible(tx2.getWritePointer())); + Assert.assertFalse(txx.isVisible(tx2c.getWritePointer())); + Assert.assertFalse(txx.isVisible(ltx1.getWritePointer())); + Assert.assertFalse(txx.isVisible(ltx2.getWritePointer())); + // verify all of the short write pointers are in the invalid list + Assert.assertEquals(3, txx.getInvalids().length); + Assert.assertArrayEquals(new long[] { + tx1.getWritePointer(), + tx2.getWritePointer(), + tx2c.getWritePointer()}, txx.getInvalids()); // try to commit the last transaction that was started Assert.assertTrue(txm.canCommit(txx, Collections.singleton(new byte[] { 0x0a }))); Assert.assertTrue(txm.commit(txx)); @@ -117,9 +216,15 @@ public class TransactionManagerTest extends TransactionSystemTest { } catch (TransactionNotInProgressException e) { // expected } + + // abort should remove tx1 from invalid, but tx2 and tx2c are still there txm.abort(tx1); - // abort should have removed from invalid + Assert.assertEquals(2, txm.getInvalidSize()); + + // aborting tx2c should remove both tx2 and tx2c from invalids + txm.abort(tx2c); Assert.assertEquals(0, txm.getInvalidSize()); + // run another bunch of transactions for (int i = 0; i < 10; i++) { Transaction tx = txm.startShort(); @@ -130,13 +235,13 @@ public class TransactionManagerTest extends TransactionSystemTest { Assert.assertEquals(0, txm.getInvalidSize()); Assert.assertEquals(0, txm.getCommittedSize()); // commit tx2, abort tx3 - Assert.assertTrue(txm.commit(tx2)); - txm.abort(tx3); + Assert.assertTrue(txm.commit(ltx1)); + txm.abort(ltx2); // none of these should still be in the committed set (tx2 is long-running). // Only tx3 is invalid list as it was aborted and is long-running. tx1 is short one and it rolled back its changes // so it should NOT be in invalid list Assert.assertEquals(1, txm.getInvalidSize()); - Assert.assertEquals(tx3.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next()); + Assert.assertEquals(ltx2.getTransactionId(), (long) txm.getCurrentState().getInvalid().iterator().next()); Assert.assertEquals(1, txm.getExcludedListSize()); } finally { txm.stopAndWait(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java index 628bced..21090c5 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java @@ -512,11 +512,11 @@ public abstract class AbstractTransactionStateStorageTest { if (i % 20 == 0) { inProgress.put(startPointer + i, new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1), - TransactionType.LONG)); + TransactionManager.InProgressType.LONG)); } else { inProgress.put(startPointer + i, - new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, - TransactionType.SHORT)); + new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, + TransactionManager.InProgressType.SHORT)); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java index 9535102..38b1283 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.tephra.ChangeId; import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionManager.InProgressType; import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.metrics.TxMetricsCollector; @@ -138,10 +139,10 @@ public class LocalTransactionStateStorageTest extends AbstractTransactionStateSt // There should be four in-progress transactions, and no invalid transactions TransactionSnapshot snapshot1 = txm.getCurrentState(); Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet()); - verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout); - verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000); - verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout); - verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000); + verifyInProgress(snapshot1.getInProgress().get(wp1), InProgressType.LONG, time1 + longTimeout); + verifyInProgress(snapshot1.getInProgress().get(wp2), InProgressType.SHORT, time2 + 1000); + verifyInProgress(snapshot1.getInProgress().get(wp3), InProgressType.LONG, time3 + longTimeout); + verifyInProgress(snapshot1.getInProgress().get(wp4), InProgressType.SHORT, time4 + 1000); Assert.assertEquals(0, snapshot1.getInvalid().size()); } finally { txm.stopAndWait(); @@ -212,8 +213,8 @@ public class LocalTransactionStateStorageTest extends AbstractTransactionStateSt } } - private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type, - long expiration) throws Exception { + private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, + InProgressType type, long expiration) throws Exception { Assert.assertEquals(type, inProgressTx.getType()); Assert.assertTrue(inProgressTx.getExpiration() == expiration); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java index afdff5c..f67c58b 100644 --- a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -30,7 +30,6 @@ import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionNotInProgressException; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.persist.TransactionSnapshot; import org.apache.tephra.persist.TransactionStateStorage; @@ -66,7 +65,7 @@ public class SnapshotCodecTest { public static TemporaryFolder tmpDir = new TemporaryFolder(); @Test - public void testMinimalDeserilization() throws Exception { + public void testMinimalDeserialization() throws Exception { long now = System.currentTimeMillis(); long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; /* @@ -82,8 +81,8 @@ public class SnapshotCodecTest { tLong, new TransactionManager.InProgressTx(readPtr, TransactionManager.getTxExpirationFromWritePointer( tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), - TransactionType.LONG), - tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); + TransactionManager.InProgressType.LONG), + tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT))); TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, Lists.newArrayList(tInvalid), // invalid @@ -144,10 +143,11 @@ public class SnapshotCodecTest { long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of( - tLong, new TransactionManager.InProgressTx(readPtr, - TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), - TransactionType.LONG), - tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); + tLong, new TransactionManager.InProgressTx( + readPtr, + TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), + TransactionManager.InProgressType.LONG), + tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT))); TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, Lists.newArrayList(tInvalid), // invalid @@ -240,7 +240,7 @@ public class SnapshotCodecTest { assertEquals(1, snapshot2.getInProgress().size()); Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx = snapshot2.getInProgress().entrySet().iterator().next(); - assertEquals(TransactionType.LONG, inProgressTx.getValue().getType()); + assertEquals(TransactionManager.InProgressType.LONG, inProgressTx.getValue().getType()); // save a new snapshot txManager2.stopAndWait(); @@ -315,13 +315,18 @@ public class SnapshotCodecTest { assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress(); - Assert.assertEquals(1, inProgress.size()); + Assert.assertEquals(2, inProgress.size()); TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId()); Assert.assertNotNull(inProgressTx); Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), inProgressTx.getCheckpointWritePointers().toLongArray()); + inProgressTx = inProgress.get(checkpointTx.getWritePointer()); + Assert.assertNotNull(inProgressTx); + Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType()); + Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty()); + txStorage.stopAndWait(); // start a new Tx manager to see if the transaction is restored correctly. @@ -335,13 +340,18 @@ public class SnapshotCodecTest { // state should be recovered snapshot = txManager.getCurrentState(); inProgress = snapshot.getInProgress(); - Assert.assertEquals(1, inProgress.size()); + Assert.assertEquals(2, inProgress.size()); inProgressTx = inProgress.get(transaction.getTransactionId()); Assert.assertNotNull(inProgressTx); Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), inProgressTx.getCheckpointWritePointers().toLongArray()); + inProgressTx = inProgress.get(checkpointTx.getWritePointer()); + Assert.assertNotNull(inProgressTx); + Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType()); + Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty()); + // Should be able to commit the transaction Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList())); Assert.assertTrue(txManager.commit(checkpointTx)); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index ae71e05..b25ae37 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; @@ -146,8 +145,8 @@ public class TransactionProcessorTest { TransactionSnapshot.copyFrom( System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, // this will set visibility upper bound to V[6] - Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, - TransactionType.SHORT))), + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index a1091ea..e612e2a 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; @@ -151,8 +150,8 @@ public class TransactionProcessorTest { TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, // this will set visibility upper bound to V[6] - Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, - TransactionType.SHORT))), + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index d7c2af0..b92bb09 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; @@ -131,8 +130,8 @@ public class TransactionProcessorTest { TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, // this will set visibility upper bound to V[6] - Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, - TransactionType.SHORT))), + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 7cbc010..4b236fc 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; @@ -131,8 +130,8 @@ public class TransactionProcessorTest { TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, // this will set visibility upper bound to V[6] - Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, - TransactionType.SHORT))), + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 4a694eb..d21c987 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.ChangeId; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; @@ -131,8 +130,8 @@ public class TransactionProcessorTest { TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, // this will set visibility upper bound to V[6] - Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, - TransactionType.SHORT))), + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), txSnapshot.getWritePointer(), txSnapshot.getInvalid(), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e80e89fe/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java index 955d2de..b91ee17 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/janitor/InvalidListPruneTest.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionType; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; @@ -173,10 +172,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Create a new transaction snapshot InMemoryTransactionStateCache.setTransactionSnapshot( new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L), - ImmutableSortedMap.of( - 105L, new TransactionManager.InProgressTx(100, 30, TransactionType.SHORT) - ) - )); + ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx( + 100, 30, TransactionManager.InProgressType.SHORT)))); Assert.assertEquals(50, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0))));
