http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java 
b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
new file mode 100644
index 0000000..a2344ba
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.junit.Test;
+
+import java.text.ParseException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for HBase version parsing.
+ */
+public class HBaseVersionTest {
+  @Test
+  public void testVersionNumber() throws Exception {
+    HBaseVersion.VersionNumber ver = HBaseVersion.VersionNumber.create("1");
+    assertVersionNumber(ver, 1, null, null, null, false);
+
+    ver = HBaseVersion.VersionNumber.create("1-SNAPSHOT");
+    assertVersionNumber(ver, 1, null, null, null, true);
+
+    ver = HBaseVersion.VersionNumber.create("1-foo");
+    assertVersionNumber(ver, 1, null, null, "foo", false);
+
+    ver = HBaseVersion.VersionNumber.create("1-foo-SNAPSHOT");
+    assertVersionNumber(ver, 1, null, null, "foo", true);
+
+    ver = HBaseVersion.VersionNumber.create("10.0");
+    assertVersionNumber(ver, 10, 0, null, null, false);
+
+    ver = HBaseVersion.VersionNumber.create("10.0-bar");
+    assertVersionNumber(ver, 10, 0, null, "bar", false);
+
+    ver = HBaseVersion.VersionNumber.create("3.2.1");
+    assertVersionNumber(ver, 3, 2, 1, null, false);
+
+    ver = HBaseVersion.VersionNumber.create("3.2.1-SNAPSHOT");
+    assertVersionNumber(ver, 3, 2, 1, null, true);
+
+    ver = HBaseVersion.VersionNumber.create("3.2.1-baz");
+    assertVersionNumber(ver, 3, 2, 1, "baz", false);
+
+    ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3");
+    assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", false);
+
+    ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3-SNAPSHOT");
+    assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", true);
+
+    try {
+      ver = HBaseVersion.VersionNumber.create("abc");
+      fail("Invalid verison number 'abc' should have thrown a ParseException");
+    } catch (ParseException pe) {
+      // expected
+    }
+
+    try {
+      ver = HBaseVersion.VersionNumber.create("1.a.b");
+      fail("Invalid verison number '1.a.b' should have thrown a 
ParseException");
+    } catch (ParseException pe) {
+      // expected
+    }
+
+    ver = HBaseVersion.VersionNumber.create("1.2.0-CDH5.7.0");
+    assertVersionNumber(ver, 1, 2, 0, "CDH5.7.0", false);
+  }
+
+  private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer 
expectedMajor, Integer expectedMinor,
+                                   Integer expectedPatch, String 
expectedClassifier, boolean snapshot) {
+    if (expectedMajor == null) {
+      assertNull(version.getMajor());
+    } else {
+      assertEquals(expectedMajor, version.getMajor());
+    }
+    if (expectedMinor == null) {
+      assertNull(version.getMinor());
+    } else {
+      assertEquals(expectedMinor, version.getMinor());
+    }
+    if (expectedPatch == null) {
+      assertNull(version.getPatch());
+    } else {
+      assertEquals(expectedPatch, version.getPatch());
+    }
+    if (expectedClassifier == null) {
+      assertNull(version.getClassifier());
+    } else {
+      assertEquals(expectedClassifier, version.getClassifier());
+    }
+    assertEquals(snapshot, version.isSnapshot());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java 
b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
new file mode 100644
index 0000000..854ccdd
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionEdit;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Util class for {@link TransactionEdit} related tests.
+ */
+public final class TransactionEditUtil {
+  private static Random random = new Random();
+
+  /**
+   * Generates a number of semi-random {@link TransactionEdit} instances.
+   * These are just randomly selected from the possible states, so would not 
necessarily reflect a real-world
+   * distribution.
+   *
+   * @param numEntries how many entries to generate in the returned list.
+   * @return a list of randomly generated transaction log edits.
+   */
+  public static List<TransactionEdit> createRandomEdits(int numEntries) {
+    List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
+    for (int i = 0; i < numEntries; i++) {
+      TransactionEdit.State nextType = 
TransactionEdit.State.values()[random.nextInt(6)];
+      long writePointer = Math.abs(random.nextLong());
+      switch (nextType) {
+        case INPROGRESS:
+          edits.add(
+            TransactionEdit.createStarted(writePointer, writePointer - 1,
+                                          System.currentTimeMillis() + 
300000L, TransactionType.SHORT));
+          break;
+        case COMMITTING:
+          edits.add(TransactionEdit.createCommitting(writePointer, 
generateChangeSet(10)));
+          break;
+        case COMMITTED:
+          edits.add(TransactionEdit.createCommitted(writePointer, 
generateChangeSet(10), writePointer + 1,
+                                                    random.nextBoolean()));
+          break;
+        case INVALID:
+          edits.add(TransactionEdit.createInvalid(writePointer));
+          break;
+        case ABORTED:
+          edits.add(TransactionEdit.createAborted(writePointer, 
TransactionType.SHORT, null));
+          break;
+        case MOVE_WATERMARK:
+          edits.add(TransactionEdit.createMoveWatermark(writePointer));
+          break;
+      }
+    }
+    return edits;
+  }
+
+  private static Set<ChangeId> generateChangeSet(int numEntries) {
+    Set<ChangeId> changes = Sets.newHashSet();
+    for (int i = 0; i < numEntries; i++) {
+      byte[] bytes = new byte[8];
+      random.nextBytes(bytes);
+      changes.add(new ChangeId(bytes));
+    }
+    return changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java 
b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
new file mode 100644
index 0000000..7743105
--- /dev/null
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.apache.tephra.Transaction;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for {@link TxUtils} utility methods.
+ */
+public class TxUtilsTest {
+  @Test
+  public void testMaxVisibleTimestamp() {
+    // make sure we don't overflow with MAX_VALUE write pointer
+    assertEquals(Long.MAX_VALUE, 
TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java
 
b/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java
new file mode 100644
index 0000000..1effcfd
--- /dev/null
+++ 
b/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The following are all the possible cases when using {@link VisibilityFence}.
+ *
+ * In the below table,
+ * "Read Txn" refers to the transaction that contains the read fence
+ * "Before Write", "During Write" and "After Write" refer to the write 
transaction time
+ * "Before Write Fence", "During Write Fence", "After Write Fence" refer to 
the write fence transaction time
+ *
+ * Timeline is: Before Write < During Write < After Write < Before Write Fence 
< During Write Fence <
+ *              After Write Fence
+ *
+ * 
+------+----------------------+----------------------+--------------------+--------------------+
+ * | Case |    Read Txn Start    |   Read Txn Commit    | Conflict on Commit | 
Conflict on Commit |
+ * |      |                      |                      | of Read Txn        | 
of Write Fence     |
+ * 
+------+----------------------+----------------------+--------------------+--------------------+
+ * |    1 | Before Write         | Before Write         | No                 | 
No                 |
+ * |    2 | Before Write         | During Write         | No                 | 
No                 |
+ * |    3 | Before Write         | After Write          | No                 | 
No                 |
+ * |    4 | Before Write         | Before Write Fence   | No                 | 
No                 |
+ * |    5 | Before Write         | During Write Fence   | No                 | 
Yes                |
+ * |    6 | Before Write         | After Write Fence    | Yes                | 
No                 |
+ * |      |                      |                      |                    | 
                   |
+ * |    7 | During Write         | During Write         | No                 | 
No                 |
+ * |    8 | During Write         | After Write          | No                 | 
No                 |
+ * |    9 | During Write         | Before Write Fence   | No                 | 
No                 |
+ * |   10 | During Write         | During Write Fence   | No                 | 
Yes                |
+ * |   11 | During Write         | After Write Fence    | Yes                | 
No                 |
+ * |      |                      |                      |                    | 
                   |
+ * |   12 | After Write          | After Write          | No                 | 
No                 |
+ * |   13 | After Write          | Before Write Fence   | No                 | 
No                 |
+ * |   14 | After Write          | During Write Fence   | No                 | 
Yes #              |
+ * |   15 | After Write          | After Write Fence    | Yes #              | 
No                 |
+ * |      |                      |                      |                    | 
                   |
+ * |   16 | Before Write Fence   | Before Write Fence   | No                 | 
No                 |
+ * |   17 | Before Write Fence   | During Write Fence   | No                 | 
Yes #              |
+ * |   18 | Before Write Fence   | After Write Fence    | Yes #              | 
No                 |
+ * |      |                      |                      |                    | 
                   |
+ * |   19 | During Write Fence   | During Write Fence   | No                 | 
No                 |
+ * |   20 | During Write Fence   | After Write Fence    | No                 | 
No                 |
+ * |      |                      |                      |                    | 
                   |
+ * |   21 | After Write Fence    | After Write Fence    | No                 | 
No                 |
+ * 
+------+----------------------+----------------------+--------------------+--------------------+
+ *
+ * Note: Cases marked with '#' in conflict column should not conflict, however 
current implementation causes
+ * them to conflict. The remaining conflicts are a result of the fence.
+ *
+ * In the current implementation of VisibilityFence, read txns that start 
"Before Write", "During Write",
+ * and "After Write" can be represented by read txns that start "Before Write 
Fence".
+ * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other 
cases.
+ */
+public class VisibilityFenceTest {
+  private static Configuration conf = new Configuration();
+
+  private static TransactionManager txManager = null;
+
+  @BeforeClass
+  public static void before() {
+    txManager = new TransactionManager(conf, new 
InMemoryTransactionStateStorage(), new TxMetricsCollector());
+    txManager.startAndWait();
+  }
+
+  @AfterClass
+  public static void after() {
+    txManager.stopAndWait();
+  }
+
+  @Test
+  public void testFence1() throws Exception {
+    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
+
+    // Writer updates data here in a separate transaction (code not shown)
+    // start tx
+    // update
+    // commit tx
+
+    // Readers use fence to indicate that they are interested in changes to 
specific data
+    TransactionAware readFenceCase16 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContextCase16 =
+      new TransactionContext(new InMemoryTxSystemClient(txManager), 
readFenceCase16);
+    readTxContextCase16.start();
+    readTxContextCase16.finish();
+
+    TransactionAware readFenceCase17 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContextCase17 =
+      new TransactionContext(new InMemoryTxSystemClient(txManager), 
readFenceCase17);
+    readTxContextCase17.start();
+
+    TransactionAware readFenceCase18 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContextCase18 =
+      new TransactionContext(new InMemoryTxSystemClient(txManager), 
readFenceCase18);
+    readTxContextCase18.start();
+
+    // Now writer needs to wait for in-progress readers to see the change, it 
uses write fence to do so
+    // Start write fence txn
+    TransactionAware writeFence = new WriteFence(fenceId);
+    TransactionContext writeTxContext = new TransactionContext(new 
InMemoryTxSystemClient(txManager), writeFence);
+    writeTxContext.start();
+
+    TransactionAware readFenceCase20 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContextCase20 =
+      new TransactionContext(new InMemoryTxSystemClient(txManager), 
readFenceCase20);
+    readTxContextCase20.start();
+
+    readTxContextCase17.finish();
+
+    assertTxnConflict(writeTxContext);
+    writeTxContext.start();
+
+    // Commit write fence txn can commit without conflicts at this point
+    writeTxContext.finish();
+
+    TransactionAware readFenceCase21 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContextCase21 =
+      new TransactionContext(new InMemoryTxSystemClient(txManager), 
readFenceCase21);
+    readTxContextCase21.start();
+
+    assertTxnConflict(readTxContextCase18);
+    readTxContextCase20.finish();
+    readTxContextCase21.finish();
+  }
+
+  private void assertTxnConflict(TransactionContext txContext) throws 
Exception {
+    try {
+      txContext.finish();
+      Assert.fail("Expected transaction to fail");
+    } catch (TransactionConflictException e) {
+      // Expected
+      txContext.abort();
+    }
+  }
+
+  @Test
+  public void testFence2() throws Exception {
+    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
+
+    // Readers use fence to indicate that they are interested in changes to 
specific data
+    // Reader 1
+    TransactionAware readFence1 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContext1 = new TransactionContext(new 
InMemoryTxSystemClient(txManager), readFence1);
+    readTxContext1.start();
+
+    // Reader 2
+    TransactionAware readFence2 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContext2 = new TransactionContext(new 
InMemoryTxSystemClient(txManager), readFence2);
+    readTxContext2.start();
+
+    // Reader 3
+    TransactionAware readFence3 = VisibilityFence.create(fenceId);
+    TransactionContext readTxContext3 = new TransactionContext(new 
InMemoryTxSystemClient(txManager), readFence3);
+    readTxContext3.start();
+
+    // Writer updates data here in a separate transaction (code not shown)
+    // start tx
+    // update
+    // commit tx
+
+    // Now writer needs to wait for readers 1, 2, and 3 to see the change, it 
uses write fence to do so
+    TransactionAware writeFence = new WriteFence(fenceId);
+    TransactionContext writeTxContext = new TransactionContext(new 
InMemoryTxSystemClient(txManager), writeFence);
+    writeTxContext.start();
+
+    // Reader 1 commits before writeFence is committed
+    readTxContext1.finish();
+
+    try {
+      // writeFence will throw exception since Reader 1 committed without 
seeing changes
+      writeTxContext.finish();
+      Assert.fail("Expected transaction to fail");
+    } catch (TransactionConflictException e) {
+      // Expected
+      writeTxContext.abort();
+    }
+
+    // Start over writeFence again
+    writeTxContext.start();
+
+    // Now, Reader 3 commits before writeFence
+    // Note that Reader 3 does not conflict with Reader 1
+    readTxContext3.finish();
+
+    try {
+      // writeFence will throw exception again since Reader 3 committed 
without seeing changes
+      writeTxContext.finish();
+      Assert.fail("Expected transaction to fail");
+    } catch (TransactionConflictException e) {
+      // Expected
+      writeTxContext.abort();
+    }
+
+    // Start over writeFence again
+    writeTxContext.start();
+    // This time writeFence commits before the other readers
+    writeTxContext.finish();
+
+    // After this point all readers will see the change
+
+    try {
+      // Reader 2 commits after writeFence, hence this commit with throw 
exception
+      readTxContext2.finish();
+      Assert.fail("Expected transaction to fail");
+    } catch (TransactionConflictException e) {
+      // Expected
+      readTxContext2.abort();
+    }
+
+    // Reader 2 has to abort and start over again. It will see the changes now.
+    readTxContext2 = new TransactionContext(new 
InMemoryTxSystemClient(txManager), readFence2);
+    readTxContext2.start();
+    readTxContext2.finish();
+  }
+
+  @Test
+  public void testFenceAwait() throws Exception {
+    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
+
+    final TransactionContext fence1 = new TransactionContext(new 
InMemoryTxSystemClient(txManager),
+                                                       
VisibilityFence.create(fenceId));
+    fence1.start();
+    final TransactionContext fence2 = new TransactionContext(new 
InMemoryTxSystemClient(txManager),
+                                                       
VisibilityFence.create(fenceId));
+    fence2.start();
+    TransactionContext fence3 = new TransactionContext(new 
InMemoryTxSystemClient(txManager),
+                                                       
VisibilityFence.create(fenceId));
+    fence3.start();
+
+    final AtomicInteger attempts = new AtomicInteger();
+    TransactionSystemClient customTxClient = new 
InMemoryTxSystemClient(txManager) {
+      @Override
+      public Transaction startShort() {
+        Transaction transaction = super.startShort();
+        try {
+          switch (attempts.getAndIncrement()) {
+            case 0:
+              fence1.finish();
+              break;
+            case 1:
+              fence2.finish();
+              break;
+            case 2:
+              break;
+            default:
+              throw new IllegalStateException("Unexpected state");
+          }
+        } catch (TransactionFailureException e) {
+          Throwables.propagate(e);
+        }
+        return transaction;
+      }
+    };
+
+    FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
+    fenceWait.await(1000, TimeUnit.MILLISECONDS);
+    Assert.assertEquals(3, attempts.get());
+
+    try {
+      fence3.finish();
+      Assert.fail("Expected transaction to fail");
+    } catch (TransactionConflictException e) {
+      // Expected exception
+      fence3.abort();
+    }
+
+    fence3.start();
+    fence3.finish();
+  }
+
+  @Test
+  public void testFenceTimeout() throws Exception {
+    byte[] fenceId = "test_table".getBytes(Charsets.UTF_8);
+
+    final TransactionContext fence1 = new TransactionContext(new 
InMemoryTxSystemClient(txManager),
+                                                             
VisibilityFence.create(fenceId));
+    fence1.start();
+
+    final long timeout = 100;
+    final TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+    final AtomicInteger attempts = new AtomicInteger();
+    TransactionSystemClient customTxClient = new 
InMemoryTxSystemClient(txManager) {
+      @Override
+      public Transaction startShort() {
+        Transaction transaction = super.startShort();
+        try {
+          switch (attempts.getAndIncrement()) {
+            case 0:
+              fence1.finish();
+              break;
+          }
+          timeUnit.sleep(timeout + 1);
+        } catch (InterruptedException | TransactionFailureException e) {
+          Throwables.propagate(e);
+        }
+        return transaction;
+      }
+    };
+
+    try {
+      FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, 
customTxClient);
+      fenceWait.await(timeout, timeUnit);
+      Assert.fail("Expected await to fail");
+    } catch (TimeoutException e) {
+      // Expected exception
+    }
+    Assert.assertEquals(1, attempts.get());
+
+    FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient);
+    fenceWait.await(timeout, timeUnit);
+    Assert.assertEquals(2, attempts.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java 
b/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java
deleted file mode 100644
index 7f82209..0000000
--- a/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.examples;
-
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase10cdh.TransactionAwareHTable;
-import co.cask.tephra.hbase10cdh.coprocessor.TransactionProcessor;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.io.Closeables;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Simple example application that launches a number of concurrent clients, 
one per "account".  Each client attempts to
- * make withdrawals from other clients, and deposit the same amount to its own 
account in a single transaction.
- * Since this means the client will be updating both its own row and the 
withdrawee's row, this will naturally lead to
- * transaction conflicts.  All clients will run for a specified number of 
iterations.  When the processing is complete,
- * the total sum of all rows should be zero, if transactional integrity was 
maintained.
- *
- * <p>
- *   You can run the BalanceBooks application with the following command:
- *   <pre>
- *     ./bin/tephra run co.cask.tephra.examples.BalanceBooks [num clients] 
[num iterations]
- *   </pre>
- *   where <code>[num clients]</code> is the number of concurrent client 
threads to use, and
- *   <code>[num iterations]</code> is the number of "transfer" operations to 
perform per client thread.
- * </p>
- */
-public class BalanceBooks implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BalanceBooks.class);
-
-  private static final int MAX_AMOUNT = 100;
-  private static final byte[] TABLE = Bytes.toBytes("testbalances");
-  private static final byte[] FAMILY = Bytes.toBytes("f");
-  private static final byte[] COL = Bytes.toBytes("b");
-
-  private final int totalClients;
-  private final int iterations;
-
-  private Configuration conf;
-  private ZKClientService zkClient;
-  private TransactionServiceClient txClient;
-  private HConnection conn;
-
-  public BalanceBooks(int totalClients, int iterations) {
-    this(totalClients, iterations, new ConfigurationFactory().get());
-  }
-
-  public BalanceBooks(int totalClients, int iterations, Configuration conf) {
-    this.totalClients = totalClients;
-    this.iterations = iterations;
-    this.conf = conf;
-  }
-
-  /**
-   * Sets up common resources required by all clients.
-   */
-  public void init() throws IOException {
-    Injector injector = Guice.createInjector(
-        new ConfigModule(conf),
-        new ZKModule(),
-        new DiscoveryModules().getDistributedModules(),
-        new TransactionModules().getDistributedModules(),
-        new TransactionClientModule()
-    );
-
-    zkClient = injector.getInstance(ZKClientService.class);
-    zkClient.startAndWait();
-    txClient = injector.getInstance(TransactionServiceClient.class);
-
-    createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
-    conn = HConnectionManager.createConnection(conf);
-  }
-
-  /**
-   * Runs all clients and waits for them to complete.
-   */
-  public void run() throws IOException, InterruptedException {
-    List<Client> clients = new ArrayList<>(totalClients);
-    for (int i = 0; i < totalClients; i++) {
-      Client c = new Client(i, totalClients, iterations);
-      c.init(txClient, conn.getTable(TABLE));
-      c.start();
-      clients.add(c);
-    }
-
-    for (Client c : clients) {
-      c.join();
-      Closeables.closeQuietly(c);
-    }
-  }
-
-  /**
-   * Validates the current state of the data stored at the end of the test.  
Each update by a client consists of two
-   * parts: a withdrawal of a random amount from a randomly select other 
account, and a corresponding to deposit to
-   * the client's own account.  So, if all the updates were performed 
consistently (no partial updates or partial
-   * rollbacks), then the total sum of all balances at the end should be 0.
-   */
-  public boolean verify() {
-    boolean success = false;
-    try {
-      TransactionAwareHTable table = new 
TransactionAwareHTable(conn.getTable(TABLE));
-      TransactionContext context = new TransactionContext(txClient, table);
-
-      LOG.info("VERIFYING BALANCES");
-      context.start();
-      long totalBalance = 0;
-      ResultScanner scanner = table.getScanner(new Scan());
-      try {
-        for (Result r : scanner) {
-          if (!r.isEmpty()) {
-            int rowId = Bytes.toInt(r.getRow());
-            long balance = Bytes.toLong(r.getValue(FAMILY, COL));
-            totalBalance += balance;
-            LOG.info("Client #{}: balance = ${}", rowId, balance);
-          }
-        }
-      } finally {
-        if (scanner != null) {
-          Closeables.closeQuietly(scanner);
-        }
-      }
-      if (totalBalance == 0) {
-        LOG.info("PASSED!");
-        success = true;
-      } else {
-        LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
-      }
-      context.finish();
-    } catch (Exception e) {
-      LOG.error("Failed verification check", e);
-    }
-    return success;
-  }
-
-  /**
-   * Frees up the underlying resources common to all clients.
-   */
-  public void close() {
-    try {
-      if (conn != null) {
-        conn.close();
-      }
-    } catch (IOException ignored) { }
-
-    if (zkClient != null) {
-      zkClient.stopAndWait();
-    }
-  }
-
-  protected void createTableIfNotExists(Configuration conf, byte[] tableName, 
byte[][] columnFamilies)
-      throws IOException {
-    HBaseAdmin admin = new HBaseAdmin(conf);
-    try {
-      HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(tableName));
-      for (byte[] family : columnFamilies) {
-        HColumnDescriptor columnDesc = new HColumnDescriptor(family);
-        columnDesc.setMaxVersions(Integer.MAX_VALUE);
-        desc.addFamily(columnDesc);
-      }
-      desc.addCoprocessor(TransactionProcessor.class.getName());
-      admin.createTable(desc);
-    } finally {
-      if (admin != null) {
-        try {
-          admin.close();
-        } catch (IOException ioe) {
-          LOG.warn("Error closing HBaseAdmin", ioe);
-        }
-      }
-    }
-  }
-
-  public static void main(String[] args) {
-    if (args.length != 2) {
-      System.err.println("Usage: java " + BalanceBooks.class.getName() + " 
<num clients> <iterations>");
-      System.err.println("\twhere <num clients> >= 2");
-      System.exit(1);
-    }
-
-    BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), 
Integer.parseInt(args[1]));
-    try {
-      bb.init();
-      bb.run();
-      bb.verify();
-    } catch (Exception e) {
-      LOG.error("Failed during BalanceBooks run", e);
-    } finally {
-      bb.close();
-    }
-  }
-
-  /**
-   * Represents a single client actor in the test.  Each client runs as a 
separate thread.
-   *
-   * For the given number of iterations, the client will:
-   * <ol>
-   *   <li>select a random other client from which to withdraw</li>
-   *   <li>select a random amount from 0 to MAX_AMOUNT</li>
-   *   <li>start a new transaction and: deduct the amount from the other 
client's acccount, and deposit
-   *       the same amount to its own account.</li>
-   * </ol>
-   *
-   * Since multiple clients operate concurrently and contend over a set of 
constrained resources
-   * (the client accounts), it is expected that a portion of the attempted 
transactions will encounter
-   * conflicts, due to a simultaneous deduction from or deposit to one the 
same accounts which has successfully
-   * committed first.  In this case, the updates from the transaction 
encountering the conflict should be completely
-   * rolled back, leaving the data in a consistent state.
-   */
-  private static class Client extends Thread implements Closeable {
-    private final int id;
-    private final int totalClients;
-    private final int iterations;
-
-    private final Random random = new Random();
-
-    private TransactionContext txContext;
-    private TransactionAwareHTable txTable;
-
-
-    public Client(int id, int totalClients, int iterations) {
-      this.id = id;
-      this.totalClients = totalClients;
-      this.iterations = iterations;
-    }
-
-    /**
-     * Sets up any resources needed by the individual client.
-     *
-     * @param txClient the transaction client to use in accessing the 
transaciton service
-     * @param table the HBase table instance to use for accessing storage
-     */
-    public void init(TransactionSystemClient txClient, HTableInterface table) {
-      txTable = new TransactionAwareHTable(table);
-      txContext = new TransactionContext(txClient, txTable);
-    }
-
-    public void run() {
-      try {
-        for (int i = 0; i < iterations; i++) {
-          runOnce();
-        }
-      } catch (TransactionFailureException e) {
-        LOG.error("Client #{}: Failed on exception", id, e);
-      }
-    }
-
-    /**
-     * Runs a single iteration of the client logic.
-     */
-    private void runOnce() throws TransactionFailureException {
-      int withdrawee = getNextWithdrawee();
-      int amount = getAmount();
-
-      try {
-        txContext.start();
-        long withdraweeBalance = getCurrentBalance(withdrawee);
-        long ownBalance = getCurrentBalance(id);
-        long withdraweeNew = withdraweeBalance - amount;
-        long ownNew = ownBalance + amount;
-
-        setBalance(withdrawee, withdraweeNew);
-        setBalance(id, ownNew);
-        LOG.info("Client #{}: Withdrew ${} from #{}; withdrawee old={}, 
new={}; own old={}, new={}",
-            id, amount, withdrawee, withdraweeBalance, withdraweeNew, 
ownBalance, ownNew);
-        txContext.finish();
-
-      } catch (IOException ioe) {
-        LOG.error("Client #{}: Unhandled client failure", id, ioe);
-        txContext.abort();
-      } catch (TransactionConflictException tce) {
-        LOG.info("CONFLICT: client #{} attempting to withdraw from #{}", id, 
withdrawee);
-        txContext.abort(tce);
-      } catch (TransactionFailureException tfe) {
-        LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
-        txContext.abort(tfe);
-      }
-    }
-
-    private long getCurrentBalance(int id) throws IOException {
-      Result r = txTable.get(new Get(Bytes.toBytes(id)));
-      byte[] balanceBytes = r.getValue(FAMILY, COL);
-      if (balanceBytes == null) {
-        return 0;
-      }
-      return Bytes.toLong(balanceBytes);
-    }
-
-    private void setBalance(int id, long balance) throws IOException {
-      txTable.put(new Put(Bytes.toBytes(id)).add(FAMILY, COL, 
Bytes.toBytes(balance)));
-    }
-
-    private int getNextWithdrawee() {
-      int next;
-      do {
-        next = random.nextInt(totalClients);
-      } while (next == id);
-      return next;
-    }
-
-    private int getAmount() {
-      return random.nextInt(MAX_AMOUNT);
-    }
-
-    public void close() throws IOException {
-      txTable.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java 
b/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java
deleted file mode 100644
index 3848e6a..0000000
--- a/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains example applications for Tephra designed to 
illustrate sample Tephra usage
- * and provide out-of-the-box sample applications which can be run to test 
cluster functionality.
- *
- * <p>Currently the following applications are provided:
- *
- * <ul>
- *   <li><strong>BalanceBooks</strong> - this application runs a specified 
number of concurrent clients in separate
- *     threads, which perform transactions to make withdrawals from each 
other's accounts and deposits to their own
- *     accounts.  At the end of the test, the total value of all account 
balances is verified to be equal to zero,
- *     which confirms that transactional integrity was not violated.
- *   </li>
- * </ul>
- * </p>
- *
- * <p>
- *   Note that, for simplicity, the examples package is currently hardcoded to 
compile against a specific HBase
- *   version (currently 1.0-cdh).  In the future, we should provide Maven 
profiles to allow compiling the examples
- *   against each of the supported HBase versions.
- * </p>
- */
-package co.cask.tephra.examples;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java 
b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
new file mode 100644
index 0000000..369b11e
--- /dev/null
+++ b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.common.io.Closeables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase10cdh.TransactionAwareHTable;
+import org.apache.tephra.hbase10cdh.coprocessor.TransactionProcessor;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Simple example application that launches a number of concurrent clients, 
one per "account".  Each client attempts to
+ * make withdrawals from other clients, and deposit the same amount to its own 
account in a single transaction.
+ * Since this means the client will be updating both its own row and the 
withdrawee's row, this will naturally lead to
+ * transaction conflicts.  All clients will run for a specified number of 
iterations.  When the processing is complete,
+ * the total sum of all rows should be zero, if transactional integrity was 
maintained.
+ *
+ * <p>
+ *   You can run the BalanceBooks application with the following command:
+ *   <pre>
+ *     ./bin/tephra run BalanceBooks [num clients] [num iterations]
+ *   </pre>
+ *   where <code>[num clients]</code> is the number of concurrent client 
threads to use, and
+ *   <code>[num iterations]</code> is the number of "transfer" operations to 
perform per client thread.
+ * </p>
+ */
+public class BalanceBooks implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BalanceBooks.class);
+
+  private static final int MAX_AMOUNT = 100;
+  private static final byte[] TABLE = Bytes.toBytes("testbalances");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] COL = Bytes.toBytes("b");
+
+  private final int totalClients;
+  private final int iterations;
+
+  private Configuration conf;
+  private ZKClientService zkClient;
+  private TransactionServiceClient txClient;
+  private HConnection conn;
+
+  public BalanceBooks(int totalClients, int iterations) {
+    this(totalClients, iterations, new ConfigurationFactory().get());
+  }
+
+  public BalanceBooks(int totalClients, int iterations, Configuration conf) {
+    this.totalClients = totalClients;
+    this.iterations = iterations;
+    this.conf = conf;
+  }
+
+  /**
+   * Sets up common resources required by all clients.
+   */
+  public void init() throws IOException {
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        new TransactionModules().getDistributedModules(),
+        new TransactionClientModule()
+    );
+
+    zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+    txClient = injector.getInstance(TransactionServiceClient.class);
+
+    createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
+    conn = HConnectionManager.createConnection(conf);
+  }
+
+  /**
+   * Runs all clients and waits for them to complete.
+   */
+  public void run() throws IOException, InterruptedException {
+    List<Client> clients = new ArrayList<>(totalClients);
+    for (int i = 0; i < totalClients; i++) {
+      Client c = new Client(i, totalClients, iterations);
+      c.init(txClient, conn.getTable(TABLE));
+      c.start();
+      clients.add(c);
+    }
+
+    for (Client c : clients) {
+      c.join();
+      Closeables.closeQuietly(c);
+    }
+  }
+
+  /**
+   * Validates the current state of the data stored at the end of the test.  
Each update by a client consists of two
+   * parts: a withdrawal of a random amount from a randomly select other 
account, and a corresponding to deposit to
+   * the client's own account.  So, if all the updates were performed 
consistently (no partial updates or partial
+   * rollbacks), then the total sum of all balances at the end should be 0.
+   */
+  public boolean verify() {
+    boolean success = false;
+    try {
+      TransactionAwareHTable table = new 
TransactionAwareHTable(conn.getTable(TABLE));
+      TransactionContext context = new TransactionContext(txClient, table);
+
+      LOG.info("VERIFYING BALANCES");
+      context.start();
+      long totalBalance = 0;
+      ResultScanner scanner = table.getScanner(new Scan());
+      try {
+        for (Result r : scanner) {
+          if (!r.isEmpty()) {
+            int rowId = Bytes.toInt(r.getRow());
+            long balance = Bytes.toLong(r.getValue(FAMILY, COL));
+            totalBalance += balance;
+            LOG.info("Client #{}: balance = ${}", rowId, balance);
+          }
+        }
+      } finally {
+        if (scanner != null) {
+          Closeables.closeQuietly(scanner);
+        }
+      }
+      if (totalBalance == 0) {
+        LOG.info("PASSED!");
+        success = true;
+      } else {
+        LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
+      }
+      context.finish();
+    } catch (Exception e) {
+      LOG.error("Failed verification check", e);
+    }
+    return success;
+  }
+
+  /**
+   * Frees up the underlying resources common to all clients.
+   */
+  public void close() {
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+    } catch (IOException ignored) { }
+
+    if (zkClient != null) {
+      zkClient.stopAndWait();
+    }
+  }
+
+  protected void createTableIfNotExists(Configuration conf, byte[] tableName, 
byte[][] columnFamilies)
+      throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(tableName));
+      for (byte[] family : columnFamilies) {
+        HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+        columnDesc.setMaxVersions(Integer.MAX_VALUE);
+        desc.addFamily(columnDesc);
+      }
+      desc.addCoprocessor(TransactionProcessor.class.getName());
+      admin.createTable(desc);
+    } finally {
+      if (admin != null) {
+        try {
+          admin.close();
+        } catch (IOException ioe) {
+          LOG.warn("Error closing HBaseAdmin", ioe);
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: java " + BalanceBooks.class.getName() + " 
<num clients> <iterations>");
+      System.err.println("\twhere <num clients> >= 2");
+      System.exit(1);
+    }
+
+    BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), 
Integer.parseInt(args[1]));
+    try {
+      bb.init();
+      bb.run();
+      bb.verify();
+    } catch (Exception e) {
+      LOG.error("Failed during BalanceBooks run", e);
+    } finally {
+      bb.close();
+    }
+  }
+
+  /**
+   * Represents a single client actor in the test.  Each client runs as a 
separate thread.
+   *
+   * For the given number of iterations, the client will:
+   * <ol>
+   *   <li>select a random other client from which to withdraw</li>
+   *   <li>select a random amount from 0 to MAX_AMOUNT</li>
+   *   <li>start a new transaction and: deduct the amount from the other 
client's acccount, and deposit
+   *       the same amount to its own account.</li>
+   * </ol>
+   *
+   * Since multiple clients operate concurrently and contend over a set of 
constrained resources
+   * (the client accounts), it is expected that a portion of the attempted 
transactions will encounter
+   * conflicts, due to a simultaneous deduction from or deposit to one the 
same accounts which has successfully
+   * committed first.  In this case, the updates from the transaction 
encountering the conflict should be completely
+   * rolled back, leaving the data in a consistent state.
+   */
+  private static class Client extends Thread implements Closeable {
+    private final int id;
+    private final int totalClients;
+    private final int iterations;
+
+    private final Random random = new Random();
+
+    private TransactionContext txContext;
+    private TransactionAwareHTable txTable;
+
+
+    public Client(int id, int totalClients, int iterations) {
+      this.id = id;
+      this.totalClients = totalClients;
+      this.iterations = iterations;
+    }
+
+    /**
+     * Sets up any resources needed by the individual client.
+     *
+     * @param txClient the transaction client to use in accessing the 
transaciton service
+     * @param table the HBase table instance to use for accessing storage
+     */
+    public void init(TransactionSystemClient txClient, HTableInterface table) {
+      txTable = new TransactionAwareHTable(table);
+      txContext = new TransactionContext(txClient, txTable);
+    }
+
+    public void run() {
+      try {
+        for (int i = 0; i < iterations; i++) {
+          runOnce();
+        }
+      } catch (TransactionFailureException e) {
+        LOG.error("Client #{}: Failed on exception", id, e);
+      }
+    }
+
+    /**
+     * Runs a single iteration of the client logic.
+     */
+    private void runOnce() throws TransactionFailureException {
+      int withdrawee = getNextWithdrawee();
+      int amount = getAmount();
+
+      try {
+        txContext.start();
+        long withdraweeBalance = getCurrentBalance(withdrawee);
+        long ownBalance = getCurrentBalance(id);
+        long withdraweeNew = withdraweeBalance - amount;
+        long ownNew = ownBalance + amount;
+
+        setBalance(withdrawee, withdraweeNew);
+        setBalance(id, ownNew);
+        LOG.info("Client #{}: Withdrew ${} from #{}; withdrawee old={}, 
new={}; own old={}, new={}",
+            id, amount, withdrawee, withdraweeBalance, withdraweeNew, 
ownBalance, ownNew);
+        txContext.finish();
+
+      } catch (IOException ioe) {
+        LOG.error("Client #{}: Unhandled client failure", id, ioe);
+        txContext.abort();
+      } catch (TransactionConflictException tce) {
+        LOG.info("CONFLICT: client #{} attempting to withdraw from #{}", id, 
withdrawee);
+        txContext.abort(tce);
+      } catch (TransactionFailureException tfe) {
+        LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
+        txContext.abort(tfe);
+      }
+    }
+
+    private long getCurrentBalance(int id) throws IOException {
+      Result r = txTable.get(new Get(Bytes.toBytes(id)));
+      byte[] balanceBytes = r.getValue(FAMILY, COL);
+      if (balanceBytes == null) {
+        return 0;
+      }
+      return Bytes.toLong(balanceBytes);
+    }
+
+    private void setBalance(int id, long balance) throws IOException {
+      txTable.put(new Put(Bytes.toBytes(id)).add(FAMILY, COL, 
Bytes.toBytes(balance)));
+    }
+
+    private int getNextWithdrawee() {
+      int next;
+      do {
+        next = random.nextInt(totalClients);
+      } while (next == id);
+      return next;
+    }
+
+    private int getAmount() {
+      return random.nextInt(MAX_AMOUNT);
+    }
+
+    public void close() throws IOException {
+      txTable.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java 
b/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java
new file mode 100644
index 0000000..172ad07
--- /dev/null
+++ b/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains example applications for Tephra designed to 
illustrate sample Tephra usage
+ * and provide out-of-the-box sample applications which can be run to test 
cluster functionality.
+ *
+ * <p>Currently the following applications are provided:
+ *
+ * <ul>
+ *   <li><strong>BalanceBooks</strong> - this application runs a specified 
number of concurrent clients in separate
+ *     threads, which perform transactions to make withdrawals from each 
other's accounts and deposits to their own
+ *     accounts.  At the end of the test, the total value of all account 
balances is verified to be equal to zero,
+ *     which confirms that transactional integrity was not violated.
+ *   </li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ *   Note that, for simplicity, the examples package is currently hardcoded to 
compile against a specific HBase
+ *   version (currently 1.0-cdh).  In the future, we should provide Maven 
profiles to allow compiling the examples
+ *   against each of the supported HBase versions.
+ * </p>
+ */
+package org.apache.tephra.examples;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java 
b/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java
deleted file mode 100644
index a5a9036..0000000
--- 
a/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.examples;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests the {@link BalanceBooks} program.
- */
-public class BalanceBooksTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BalanceBooksTest.class);
-  private static HBaseTestingUtility testUtil;
-  private static TransactionService txService;
-  private static ZKClientService zkClientService;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    testUtil = new HBaseTestingUtility();
-    Configuration conf = testUtil.getConfiguration();
-    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tx.snapshot");
-
-    // Tune down the connection thread pool size
-    conf.setInt("hbase.hconnection.threads.core", 5);
-    conf.setInt("hbase.hconnection.threads.max", 10);
-    // Tunn down handler threads in regionserver
-    conf.setInt("hbase.regionserver.handler.count", 10);
-
-    // Set to random port
-    conf.setInt("hbase.master.port", 0);
-    conf.setInt("hbase.master.info.port", 0);
-    conf.setInt("hbase.regionserver.port", 0);
-    conf.setInt("hbase.regionserver.info.port", 0);
-
-    testUtil.startMiniCluster();
-
-    String zkClusterKey = testUtil.getClusterKey(); // 
hostname:clientPort:parentZnode
-    String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
-    LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
-    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
-
-    Injector injector = Guice.createInjector(
-        new ConfigModule(conf),
-        new ZKModule(),
-        new DiscoveryModules().getDistributedModules(),
-        Modules.override(new TransactionModules().getDistributedModules())
-            .with(new AbstractModule() {
-              @Override
-              protected void configure() {
-                
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
-              }
-            }),
-        new TransactionClientModule()
-    );
-
-    zkClientService = injector.getInstance(ZKClientService.class);
-    zkClientService.startAndWait();
-
-    // start a tx server
-    txService = injector.getInstance(TransactionService.class);
-    try {
-      LOG.info("Starting transaction service");
-      txService.startAndWait();
-    } catch (Exception e) {
-      LOG.error("Failed to start service: ", e);
-    }
-
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    if (txService != null) {
-      txService.stopAndWait();
-    }
-    if (zkClientService != null) {
-      zkClientService.stopAndWait();
-    }
-    testUtil.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testBalanceBooks() throws Exception {
-    BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration());
-    try {
-      bb.init();
-      bb.run();
-      assertTrue(bb.verify());
-    } finally {
-      bb.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
 
b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
new file mode 100644
index 0000000..d74a133
--- /dev/null
+++ 
b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link BalanceBooks} program.
+ */
+public class BalanceBooksTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BalanceBooksTest.class);
+  private static HBaseTestingUtility testUtil;
+  private static TransactionService txService;
+  private static ZKClientService zkClientService;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    testUtil = new HBaseTestingUtility();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tx.snapshot");
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+
+    String zkClusterKey = testUtil.getClusterKey(); // 
hostname:clientPort:parentZnode
+    String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
+    LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
+
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        Modules.override(new TransactionModules().getDistributedModules())
+            .with(new AbstractModule() {
+              @Override
+              protected void configure() {
+                
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+              }
+            }),
+        new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (txService != null) {
+      txService.stopAndWait();
+    }
+    if (zkClientService != null) {
+      zkClientService.stopAndWait();
+    }
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBalanceBooks() throws Exception {
+    BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration());
+    try {
+      bb.init();
+      bb.run();
+      assertTrue(bb.verify());
+    } finally {
+      bb.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java
 
b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java
deleted file mode 100644
index 834ed59..0000000
--- 
a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.hbase96;
-
-import co.cask.tephra.util.ConfigurationProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-
-/**
- * HBase 0.96 version of {@link co.cask.tephra.util.ConfigurationProvider}.
- */
-public class HBase96ConfigurationProvider extends ConfigurationProvider {
-  @Override
-  public Configuration get() {
-    return HBaseConfiguration.create();
-  }
-
-  @Override
-  public Configuration get(Configuration baseConf) {
-    return HBaseConfiguration.create(baseConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java
 
b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java
deleted file mode 100644
index eaa5564..0000000
--- 
a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.hbase96;
-
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A Transactional SecondaryIndexTable.
- */
-public class SecondaryIndexTable {
-  private byte[] secondaryIndex;
-  private TransactionAwareHTable transactionAwareHTable;
-  private TransactionAwareHTable secondaryIndexTable;
-  private TransactionContext transactionContext;
-  private final TableName secondaryIndexTableName;
-  private static final byte[] secondaryIndexFamily = 
Bytes.toBytes("secondaryIndexFamily");
-  private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
-  private static final byte[] DELIMITER  = new byte[] {0};
-
-  public SecondaryIndexTable(TransactionServiceClient 
transactionServiceClient, HTableInterface hTable,
-                             byte[] secondaryIndex) {
-    secondaryIndexTableName = 
TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
-    HTable secondaryIndexHTable = null;
-    HBaseAdmin hBaseAdmin = null;
-    try {
-      hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
-      if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
-        hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
-      }
-      secondaryIndexHTable = new HTable(hTable.getConfiguration(), 
secondaryIndexTableName);
-    } catch (Exception e) {
-      Throwables.propagate(e);
-    } finally {
-      try {
-        hBaseAdmin.close();
-      } catch (Exception e) {
-        Throwables.propagate(e);
-      }
-    }
-
-    this.secondaryIndex = secondaryIndex;
-    this.transactionAwareHTable = new TransactionAwareHTable(hTable);
-    this.secondaryIndexTable = new 
TransactionAwareHTable(secondaryIndexHTable);
-    this.transactionContext = new TransactionContext(transactionServiceClient, 
transactionAwareHTable,
-                                                secondaryIndexTable);
-  }
-
-  public Result get(Get get) throws IOException {
-    return get(Collections.singletonList(get))[0];
-  }
-
-  public Result[] get(List<Get> gets) throws IOException {
-    try {
-      transactionContext.start();
-      Result[] result = transactionAwareHTable.get(gets);
-      transactionContext.finish();
-      return result;
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-    return null;
-  }
-
-  public Result[] getByIndex(byte[] value) throws IOException {
-    try {
-      transactionContext.start();
-      Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
-      scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
-      ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
-
-      ArrayList<Get> gets = new ArrayList<Get>();
-      for (Result result : indexScanner) {
-        for (Cell cell : result.listCells()) {
-          gets.add(new Get(cell.getValue()));
-        }
-      }
-      Result[] results = transactionAwareHTable.get(gets);
-      transactionContext.finish();
-      return results;
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-    return null;
-  }
-
-  public void put(Put put) throws IOException {
-    put(Collections.singletonList(put));
-  }
-
-
-  public void put(List<Put> puts) throws IOException {
-    try {
-      transactionContext.start();
-      ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
-      for (Put put : puts) {
-        List<Put> indexPuts = new ArrayList<Put>();
-        Set<Map.Entry<byte[], List<KeyValue>>> familyMap = 
put.getFamilyMap().entrySet();
-        for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
-          for (KeyValue value : family.getValue()) {
-            if (Bytes.equals(value.getQualifierArray(), 
value.getQualifierOffset(), value.getQualifierLength(),
-                             secondaryIndex, 0, secondaryIndex.length)) {
-              byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER,
-                                                    
Bytes.add(value.getValue(), DELIMITER,
-                                                              value.getRow()));
-              Put indexPut = new Put(secondaryRow);
-              indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, 
put.getRow());
-              indexPuts.add(indexPut);
-            }
-          }
-        }
-        secondaryIndexPuts.addAll(indexPuts);
-      }
-      transactionAwareHTable.put(puts);
-      secondaryIndexTable.put(secondaryIndexPuts);
-      transactionContext.finish();
-    } catch (Exception e) {
-      try {
-        transactionContext.abort();
-      } catch (TransactionFailureException e1) {
-        throw new IOException("Could not rollback transaction", e1);
-      }
-    }
-  }
-}

Reply via email to