Repository: incubator-tephra
Updated Branches:
  refs/heads/master eb6f9e6ac -> 6d8097db2


TEPHRA-235 Ensure that TransactionSnapshot always have a sorted invalid 
transaction list.

This closes #44

Signed-off-by: poorna <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6d8097db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6d8097db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6d8097db

Branch: refs/heads/master
Commit: 6d8097db28e6b5fdf3ec2dc6b97eb85de53f6ce0
Parents: eb6f9e6
Author: Ali Anwar <[email protected]>
Authored: Tue Jun 20 18:15:56 2017 -0700
Committer: poorna <[email protected]>
Committed: Thu Jun 22 11:32:02 2017 -0700

----------------------------------------------------------------------
 .../org/apache/tephra/TransactionManager.java   |  2 +-
 .../apache/tephra/manager/InvalidTxList.java    |  2 +-
 .../tephra/persist/TransactionSnapshot.java     | 33 +++++++++++++++++---
 .../tephra/snapshot/SnapshotCodecTest.java      | 15 +++++++++
 .../coprocessor/TransactionProcessorTest.java   |  5 ++-
 .../coprocessor/TransactionProcessorTest.java   |  5 ++-
 .../coprocessor/TransactionProcessorTest.java   |  5 ++-
 .../coprocessor/TransactionProcessorTest.java   |  5 ++-
 .../coprocessor/TransactionProcessorTest.java   |  5 ++-
 9 files changed, 65 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 27b6bc6..3f332ad 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -466,7 +466,7 @@ public class TransactionManager extends AbstractService {
 
   public synchronized TransactionSnapshot getCurrentState() {
     return TransactionSnapshot.copyFrom(System.currentTimeMillis(), 
readPointer, lastWritePointer,
-                                        invalidTxList.toRawList(), inProgress, 
committingChangeSets,
+                                        invalidTxList, inProgress, 
committingChangeSets,
                                         committedChangeSets);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java 
b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
index 231196c..5d032f6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
+++ b/tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java
@@ -110,7 +110,7 @@ public class InvalidTxList {
   }
 
   /**
-   * @return list of invalid transactions. The list is not sorted.
+   * @return list of invalid transactions. The list is not guaranteed to be 
sorted.
    */
   public LongList toRawList() {
     return LongLists.unmodifiable(invalid);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java 
b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
index ccf7374..d76a98f 100644
--- 
a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
+++ 
b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionSnapshot.java
@@ -19,10 +19,11 @@
 package org.apache.tephra.persist;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import org.apache.tephra.ChangeId;
 import org.apache.tephra.TransactionManager;
+import org.apache.tephra.manager.InvalidTxList;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -44,6 +45,18 @@ public class TransactionSnapshot implements 
TransactionVisibilityState {
   private Map<Long, Set<ChangeId>> committingChangeSets;
   private Map<Long, Set<ChangeId>> committedChangeSets;
 
+  /**
+   * Creates an instance of TransactionSnapshot with the given transaction 
state
+   *
+   * @param timestamp timestamp, in millis, that the snapshot was taken
+   * @param readPointer current transaction read pointer
+   * @param writePointer current transaction write pointer
+   * @param invalid current list of invalid write pointer; must be sorted
+   * @param inProgress current map of in-progress write pointers to expiration 
timestamps
+   * @param committing current map of write pointers to change sets which have 
passed {@code canCommit()} but not
+   *                   yet committed
+   * @param committed current map of write pointers to change sets which have 
committed
+   */
   public TransactionSnapshot(long timestamp, long readPointer, long 
writePointer, Collection<Long> invalid,
                              NavigableMap<Long, 
TransactionManager.InProgressTx> inProgress,
                              Map<Long, Set<ChangeId>> committing, Map<Long, 
Set<ChangeId>> committed) {
@@ -52,6 +65,15 @@ public class TransactionSnapshot implements 
TransactionVisibilityState {
     this.committedChangeSets = committed;
   }
 
+  /**
+   * Creates an instance of TransactionSnapshot with the given transaction 
state
+   *
+   * @param timestamp timestamp, in millis, that the snapshot was taken
+   * @param readPointer current transaction read pointer
+   * @param writePointer current transaction write pointer
+   * @param invalid current list of invalid write pointer; must be sorted
+   * @param inProgress current map of in-progress write pointers to expiration 
timestamps
+   */
   public TransactionSnapshot(long timestamp, long readPointer, long 
writePointer, Collection<Long> invalid,
                              NavigableMap<Long, 
TransactionManager.InProgressTx> inProgress) {
     this.timestamp = timestamp;
@@ -162,9 +184,10 @@ public class TransactionSnapshot implements 
TransactionVisibilityState {
 
   /**
    * Creates a new {@code TransactionSnapshot} instance with copies of all of 
the individual collections.
+   * @param snapshotTime timestamp, in millis, that the snapshot was taken
    * @param readPointer current transaction read pointer
    * @param writePointer current transaction write pointer
-   * @param invalid current list of invalid write pointers
+   * @param invalidTxList current list of invalid write pointers
    * @param inProgress current map of in-progress write pointers to expiration 
timestamps
    * @param committing current map of write pointers to change sets which have 
passed {@code canCommit()} but not
    *                   yet committed
@@ -172,12 +195,12 @@ public class TransactionSnapshot implements 
TransactionVisibilityState {
    * @return a new {@code TransactionSnapshot} instance
    */
   public static TransactionSnapshot copyFrom(long snapshotTime, long 
readPointer,
-                                             long writePointer, 
Collection<Long> invalid,
+                                             long writePointer, InvalidTxList 
invalidTxList,
                                              NavigableMap<Long, 
TransactionManager.InProgressTx> inProgress,
                                              Map<Long, Set<ChangeId>> 
committing,
                                              NavigableMap<Long, Set<ChangeId>> 
committed) {
-    // copy invalid IDs
-    Collection<Long> invalidCopy = Lists.newArrayList(invalid);
+    // copy invalid IDs, after sorting
+    Collection<Long> invalidCopy = new 
LongArrayList(invalidTxList.toSortedArray());
     // copy in-progress IDs and expirations
     NavigableMap<Long, TransactionManager.InProgressTx> inProgressCopy = 
Maps.newTreeMap(inProgress);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java 
b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
index f67c58b..18f81c8 100644
--- 
a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
+++ 
b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java
@@ -20,8 +20,11 @@ package org.apache.tephra.snapshot;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -46,6 +49,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -303,6 +307,14 @@ public class SnapshotCodecTest {
     Transaction transaction = txManager.startLong();
     Transaction checkpointTx = txManager.checkpoint(transaction);
 
+    // create invalid transactions (invalidated out of order)
+    Transaction shortTx1 = txManager.startShort();
+    Transaction shortTx2 = txManager.startShort();
+    Transaction shortTx3 = txManager.startShort();
+    txManager.invalidate(shortTx3.getTransactionId());
+    txManager.invalidate(shortTx1.getTransactionId());
+    txManager.invalidate(shortTx2.getTransactionId());
+
     // shutdown to force a snapshot
     txManager.stopAndWait();
 
@@ -311,6 +323,7 @@ public class SnapshotCodecTest {
     txStorage.startAndWait();
 
     TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     TransactionVisibilityState txVisibilityState = 
txStorage.getLatestTransactionVisibilityState();
     assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
 
@@ -339,6 +352,7 @@ public class SnapshotCodecTest {
 
     // state should be recovered
     snapshot = txManager.getCurrentState();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     inProgress = snapshot.getInProgress();
     Assert.assertEquals(2, inProgress.size());
 
@@ -363,6 +377,7 @@ public class SnapshotCodecTest {
     txStorage2.startAndWait();
 
     snapshot = txStorage2.getLatestSnapshot();
+    Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
     Assert.assertTrue(snapshot.getInProgress().isEmpty());
     txStorage2.stopAndWait();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
 
b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b25ae37..1879116 100644
--- 
a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ 
b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -64,6 +64,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -141,9 +142,11 @@ public class TransactionProcessorTest {
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, 
DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot =
       TransactionSnapshot.copyFrom(
-        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+        System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
         // this will set visibility upper bound to V[6]
         Maps.newTreeMap(ImmutableSortedMap.of(V[6], new 
TransactionManager.InProgressTx(
           V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index e612e2a..abe375d 100644
--- 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -70,6 +70,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -147,8 +148,10 @@ public class TransactionProcessorTest {
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, 
DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new 
TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
 
b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index b92bb09..f6d8e2d 100644
--- 
a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ 
b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@ public class TransactionProcessorTest {
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, 
DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new 
TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index 4b236fc..8dfce32 100644
--- 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@ public class TransactionProcessorTest {
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, 
DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-      System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+      System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
       // this will set visibility upper bound to V[6]
       Maps.newTreeMap(ImmutableSortedMap.of(V[6], new 
TransactionManager.InProgressTx(
         V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6d8097db/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
 
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
index d21c987..9f7206d 100644
--- 
a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
+++ 
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
@@ -53,6 +53,7 @@ import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.coprocessor.TransactionStateCache;
 import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
 import org.apache.tephra.metrics.TxMetricsCollector;
 import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionSnapshot;
@@ -127,8 +128,10 @@ public class TransactionProcessorTest {
     conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, 
DefaultSnapshotCodec.class.getName());
 
     // write an initial transaction snapshot
+    InvalidTxList invalidTxList = new InvalidTxList();
+    invalidTxList.addAll(invalidSet);
     TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
-        System.currentTimeMillis(), V[6] - 1, V[7], invalidSet,
+        System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
         // this will set visibility upper bound to V[6]
         Maps.newTreeMap(ImmutableSortedMap.of(V[6], new 
TransactionManager.InProgressTx(
           V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),

Reply via email to