http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java new file mode 100644 index 0000000..a2344ba --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/util/HBaseVersionTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import org.junit.Test; + +import java.text.ParseException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests for HBase version parsing. + */ +public class HBaseVersionTest { + @Test + public void testVersionNumber() throws Exception { + HBaseVersion.VersionNumber ver = HBaseVersion.VersionNumber.create("1"); + assertVersionNumber(ver, 1, null, null, null, false); + + ver = HBaseVersion.VersionNumber.create("1-SNAPSHOT"); + assertVersionNumber(ver, 1, null, null, null, true); + + ver = HBaseVersion.VersionNumber.create("1-foo"); + assertVersionNumber(ver, 1, null, null, "foo", false); + + ver = HBaseVersion.VersionNumber.create("1-foo-SNAPSHOT"); + assertVersionNumber(ver, 1, null, null, "foo", true); + + ver = HBaseVersion.VersionNumber.create("10.0"); + assertVersionNumber(ver, 10, 0, null, null, false); + + ver = HBaseVersion.VersionNumber.create("10.0-bar"); + assertVersionNumber(ver, 10, 0, null, "bar", false); + + ver = HBaseVersion.VersionNumber.create("3.2.1"); + assertVersionNumber(ver, 3, 2, 1, null, false); + + ver = HBaseVersion.VersionNumber.create("3.2.1-SNAPSHOT"); + assertVersionNumber(ver, 3, 2, 1, null, true); + + ver = HBaseVersion.VersionNumber.create("3.2.1-baz"); + assertVersionNumber(ver, 3, 2, 1, "baz", false); + + ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3"); + assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", false); + + ver = HBaseVersion.VersionNumber.create("3.2.1-baz1.2.3-SNAPSHOT"); + assertVersionNumber(ver, 3, 2, 1, "baz1.2.3", true); + + try { + ver = HBaseVersion.VersionNumber.create("abc"); + fail("Invalid verison number 'abc' should have thrown a ParseException"); + } catch (ParseException pe) { + // expected + } + + try { + ver = HBaseVersion.VersionNumber.create("1.a.b"); + fail("Invalid verison number '1.a.b' should have thrown a ParseException"); + } catch (ParseException pe) { + // expected + } + + ver = HBaseVersion.VersionNumber.create("1.2.0-CDH5.7.0"); + assertVersionNumber(ver, 1, 2, 0, "CDH5.7.0", false); + } + + private void assertVersionNumber(HBaseVersion.VersionNumber version, Integer expectedMajor, Integer expectedMinor, + Integer expectedPatch, String expectedClassifier, boolean snapshot) { + if (expectedMajor == null) { + assertNull(version.getMajor()); + } else { + assertEquals(expectedMajor, version.getMajor()); + } + if (expectedMinor == null) { + assertNull(version.getMinor()); + } else { + assertEquals(expectedMinor, version.getMinor()); + } + if (expectedPatch == null) { + assertNull(version.getPatch()); + } else { + assertEquals(expectedPatch, version.getPatch()); + } + if (expectedClassifier == null) { + assertNull(version.getClassifier()); + } else { + assertEquals(expectedClassifier, version.getClassifier()); + } + assertEquals(snapshot, version.isSnapshot()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java new file mode 100644 index 0000000..854ccdd --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionType; +import org.apache.tephra.persist.TransactionEdit; + +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * Util class for {@link TransactionEdit} related tests. + */ +public final class TransactionEditUtil { + private static Random random = new Random(); + + /** + * Generates a number of semi-random {@link TransactionEdit} instances. + * These are just randomly selected from the possible states, so would not necessarily reflect a real-world + * distribution. + * + * @param numEntries how many entries to generate in the returned list. + * @return a list of randomly generated transaction log edits. + */ + public static List<TransactionEdit> createRandomEdits(int numEntries) { + List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries); + for (int i = 0; i < numEntries; i++) { + TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)]; + long writePointer = Math.abs(random.nextLong()); + switch (nextType) { + case INPROGRESS: + edits.add( + TransactionEdit.createStarted(writePointer, writePointer - 1, + System.currentTimeMillis() + 300000L, TransactionType.SHORT)); + break; + case COMMITTING: + edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10))); + break; + case COMMITTED: + edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1, + random.nextBoolean())); + break; + case INVALID: + edits.add(TransactionEdit.createInvalid(writePointer)); + break; + case ABORTED: + edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null)); + break; + case MOVE_WATERMARK: + edits.add(TransactionEdit.createMoveWatermark(writePointer)); + break; + } + } + return edits; + } + + private static Set<ChangeId> generateChangeSet(int numEntries) { + Set<ChangeId> changes = Sets.newHashSet(); + for (int i = 0; i < numEntries; i++) { + byte[] bytes = new byte[8]; + random.nextBytes(bytes); + changes.add(new ChangeId(bytes)); + } + return changes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java new file mode 100644 index 0000000..7743105 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import org.apache.tephra.Transaction; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test cases for {@link TxUtils} utility methods. + */ +public class TxUtilsTest { + @Test + public void testMaxVisibleTimestamp() { + // make sure we don't overflow with MAX_VALUE write pointer + assertEquals(Long.MAX_VALUE, TxUtils.getMaxVisibleTimestamp(Transaction.ALL_VISIBLE_LATEST)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java b/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java new file mode 100644 index 0000000..1effcfd --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/visibility/VisibilityFenceTest.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.visibility; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The following are all the possible cases when using {@link VisibilityFence}. + * + * In the below table, + * "Read Txn" refers to the transaction that contains the read fence + * "Before Write", "During Write" and "After Write" refer to the write transaction time + * "Before Write Fence", "During Write Fence", "After Write Fence" refer to the write fence transaction time + * + * Timeline is: Before Write < During Write < After Write < Before Write Fence < During Write Fence < + * After Write Fence + * + * +------+----------------------+----------------------+--------------------+--------------------+ + * | Case | Read Txn Start | Read Txn Commit | Conflict on Commit | Conflict on Commit | + * | | | | of Read Txn | of Write Fence | + * +------+----------------------+----------------------+--------------------+--------------------+ + * | 1 | Before Write | Before Write | No | No | + * | 2 | Before Write | During Write | No | No | + * | 3 | Before Write | After Write | No | No | + * | 4 | Before Write | Before Write Fence | No | No | + * | 5 | Before Write | During Write Fence | No | Yes | + * | 6 | Before Write | After Write Fence | Yes | No | + * | | | | | | + * | 7 | During Write | During Write | No | No | + * | 8 | During Write | After Write | No | No | + * | 9 | During Write | Before Write Fence | No | No | + * | 10 | During Write | During Write Fence | No | Yes | + * | 11 | During Write | After Write Fence | Yes | No | + * | | | | | | + * | 12 | After Write | After Write | No | No | + * | 13 | After Write | Before Write Fence | No | No | + * | 14 | After Write | During Write Fence | No | Yes # | + * | 15 | After Write | After Write Fence | Yes # | No | + * | | | | | | + * | 16 | Before Write Fence | Before Write Fence | No | No | + * | 17 | Before Write Fence | During Write Fence | No | Yes # | + * | 18 | Before Write Fence | After Write Fence | Yes # | No | + * | | | | | | + * | 19 | During Write Fence | During Write Fence | No | No | + * | 20 | During Write Fence | After Write Fence | No | No | + * | | | | | | + * | 21 | After Write Fence | After Write Fence | No | No | + * +------+----------------------+----------------------+--------------------+--------------------+ + * + * Note: Cases marked with '#' in conflict column should not conflict, however current implementation causes + * them to conflict. The remaining conflicts are a result of the fence. + * + * In the current implementation of VisibilityFence, read txns that start "Before Write", "During Write", + * and "After Write" can be represented by read txns that start "Before Write Fence". + * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other cases. + */ +public class VisibilityFenceTest { + private static Configuration conf = new Configuration(); + + private static TransactionManager txManager = null; + + @BeforeClass + public static void before() { + txManager = new TransactionManager(conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + txManager.startAndWait(); + } + + @AfterClass + public static void after() { + txManager.stopAndWait(); + } + + @Test + public void testFence1() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + // Writer updates data here in a separate transaction (code not shown) + // start tx + // update + // commit tx + + // Readers use fence to indicate that they are interested in changes to specific data + TransactionAware readFenceCase16 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase16 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase16); + readTxContextCase16.start(); + readTxContextCase16.finish(); + + TransactionAware readFenceCase17 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase17 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase17); + readTxContextCase17.start(); + + TransactionAware readFenceCase18 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase18 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase18); + readTxContextCase18.start(); + + // Now writer needs to wait for in-progress readers to see the change, it uses write fence to do so + // Start write fence txn + TransactionAware writeFence = new WriteFence(fenceId); + TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); + writeTxContext.start(); + + TransactionAware readFenceCase20 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase20 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase20); + readTxContextCase20.start(); + + readTxContextCase17.finish(); + + assertTxnConflict(writeTxContext); + writeTxContext.start(); + + // Commit write fence txn can commit without conflicts at this point + writeTxContext.finish(); + + TransactionAware readFenceCase21 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase21 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase21); + readTxContextCase21.start(); + + assertTxnConflict(readTxContextCase18); + readTxContextCase20.finish(); + readTxContextCase21.finish(); + } + + private void assertTxnConflict(TransactionContext txContext) throws Exception { + try { + txContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + txContext.abort(); + } + } + + @Test + public void testFence2() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + // Readers use fence to indicate that they are interested in changes to specific data + // Reader 1 + TransactionAware readFence1 = VisibilityFence.create(fenceId); + TransactionContext readTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence1); + readTxContext1.start(); + + // Reader 2 + TransactionAware readFence2 = VisibilityFence.create(fenceId); + TransactionContext readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); + readTxContext2.start(); + + // Reader 3 + TransactionAware readFence3 = VisibilityFence.create(fenceId); + TransactionContext readTxContext3 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence3); + readTxContext3.start(); + + // Writer updates data here in a separate transaction (code not shown) + // start tx + // update + // commit tx + + // Now writer needs to wait for readers 1, 2, and 3 to see the change, it uses write fence to do so + TransactionAware writeFence = new WriteFence(fenceId); + TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); + writeTxContext.start(); + + // Reader 1 commits before writeFence is committed + readTxContext1.finish(); + + try { + // writeFence will throw exception since Reader 1 committed without seeing changes + writeTxContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + writeTxContext.abort(); + } + + // Start over writeFence again + writeTxContext.start(); + + // Now, Reader 3 commits before writeFence + // Note that Reader 3 does not conflict with Reader 1 + readTxContext3.finish(); + + try { + // writeFence will throw exception again since Reader 3 committed without seeing changes + writeTxContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + writeTxContext.abort(); + } + + // Start over writeFence again + writeTxContext.start(); + // This time writeFence commits before the other readers + writeTxContext.finish(); + + // After this point all readers will see the change + + try { + // Reader 2 commits after writeFence, hence this commit with throw exception + readTxContext2.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + readTxContext2.abort(); + } + + // Reader 2 has to abort and start over again. It will see the changes now. + readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); + readTxContext2.start(); + readTxContext2.finish(); + } + + @Test + public void testFenceAwait() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence1.start(); + final TransactionContext fence2 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence2.start(); + TransactionContext fence3 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence3.start(); + + final AtomicInteger attempts = new AtomicInteger(); + TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { + @Override + public Transaction startShort() { + Transaction transaction = super.startShort(); + try { + switch (attempts.getAndIncrement()) { + case 0: + fence1.finish(); + break; + case 1: + fence2.finish(); + break; + case 2: + break; + default: + throw new IllegalStateException("Unexpected state"); + } + } catch (TransactionFailureException e) { + Throwables.propagate(e); + } + return transaction; + } + }; + + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(1000, TimeUnit.MILLISECONDS); + Assert.assertEquals(3, attempts.get()); + + try { + fence3.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected exception + fence3.abort(); + } + + fence3.start(); + fence3.finish(); + } + + @Test + public void testFenceTimeout() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence1.start(); + + final long timeout = 100; + final TimeUnit timeUnit = TimeUnit.MILLISECONDS; + final AtomicInteger attempts = new AtomicInteger(); + TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { + @Override + public Transaction startShort() { + Transaction transaction = super.startShort(); + try { + switch (attempts.getAndIncrement()) { + case 0: + fence1.finish(); + break; + } + timeUnit.sleep(timeout + 1); + } catch (InterruptedException | TransactionFailureException e) { + Throwables.propagate(e); + } + return transaction; + } + }; + + try { + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(timeout, timeUnit); + Assert.fail("Expected await to fail"); + } catch (TimeoutException e) { + // Expected exception + } + Assert.assertEquals(1, attempts.get()); + + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(timeout, timeUnit); + Assert.assertEquals(2, attempts.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java b/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java deleted file mode 100644 index 7f82209..0000000 --- a/tephra-examples/src/main/java/co/cask/tephra/examples/BalanceBooks.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.examples; - -import co.cask.tephra.TransactionConflictException; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.hbase10cdh.TransactionAwareHTable; -import co.cask.tephra.hbase10cdh.coprocessor.TransactionProcessor; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.common.io.Closeables; -import com.google.inject.Guice; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.twill.zookeeper.ZKClientService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * Simple example application that launches a number of concurrent clients, one per "account". Each client attempts to - * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction. - * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to - * transaction conflicts. All clients will run for a specified number of iterations. When the processing is complete, - * the total sum of all rows should be zero, if transactional integrity was maintained. - * - * <p> - * You can run the BalanceBooks application with the following command: - * <pre> - * ./bin/tephra run co.cask.tephra.examples.BalanceBooks [num clients] [num iterations] - * </pre> - * where <code>[num clients]</code> is the number of concurrent client threads to use, and - * <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread. - * </p> - */ -public class BalanceBooks implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class); - - private static final int MAX_AMOUNT = 100; - private static final byte[] TABLE = Bytes.toBytes("testbalances"); - private static final byte[] FAMILY = Bytes.toBytes("f"); - private static final byte[] COL = Bytes.toBytes("b"); - - private final int totalClients; - private final int iterations; - - private Configuration conf; - private ZKClientService zkClient; - private TransactionServiceClient txClient; - private HConnection conn; - - public BalanceBooks(int totalClients, int iterations) { - this(totalClients, iterations, new ConfigurationFactory().get()); - } - - public BalanceBooks(int totalClients, int iterations, Configuration conf) { - this.totalClients = totalClients; - this.iterations = iterations; - this.conf = conf; - } - - /** - * Sets up common resources required by all clients. - */ - public void init() throws IOException { - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() - ); - - zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); - txClient = injector.getInstance(TransactionServiceClient.class); - - createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY }); - conn = HConnectionManager.createConnection(conf); - } - - /** - * Runs all clients and waits for them to complete. - */ - public void run() throws IOException, InterruptedException { - List<Client> clients = new ArrayList<>(totalClients); - for (int i = 0; i < totalClients; i++) { - Client c = new Client(i, totalClients, iterations); - c.init(txClient, conn.getTable(TABLE)); - c.start(); - clients.add(c); - } - - for (Client c : clients) { - c.join(); - Closeables.closeQuietly(c); - } - } - - /** - * Validates the current state of the data stored at the end of the test. Each update by a client consists of two - * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to - * the client's own account. So, if all the updates were performed consistently (no partial updates or partial - * rollbacks), then the total sum of all balances at the end should be 0. - */ - public boolean verify() { - boolean success = false; - try { - TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TABLE)); - TransactionContext context = new TransactionContext(txClient, table); - - LOG.info("VERIFYING BALANCES"); - context.start(); - long totalBalance = 0; - ResultScanner scanner = table.getScanner(new Scan()); - try { - for (Result r : scanner) { - if (!r.isEmpty()) { - int rowId = Bytes.toInt(r.getRow()); - long balance = Bytes.toLong(r.getValue(FAMILY, COL)); - totalBalance += balance; - LOG.info("Client #{}: balance = ${}", rowId, balance); - } - } - } finally { - if (scanner != null) { - Closeables.closeQuietly(scanner); - } - } - if (totalBalance == 0) { - LOG.info("PASSED!"); - success = true; - } else { - LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance); - } - context.finish(); - } catch (Exception e) { - LOG.error("Failed verification check", e); - } - return success; - } - - /** - * Frees up the underlying resources common to all clients. - */ - public void close() { - try { - if (conn != null) { - conn.close(); - } - } catch (IOException ignored) { } - - if (zkClient != null) { - zkClient.stopAndWait(); - } - } - - protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies) - throws IOException { - HBaseAdmin admin = new HBaseAdmin(conf); - try { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - for (byte[] family : columnFamilies) { - HColumnDescriptor columnDesc = new HColumnDescriptor(family); - columnDesc.setMaxVersions(Integer.MAX_VALUE); - desc.addFamily(columnDesc); - } - desc.addCoprocessor(TransactionProcessor.class.getName()); - admin.createTable(desc); - } finally { - if (admin != null) { - try { - admin.close(); - } catch (IOException ioe) { - LOG.warn("Error closing HBaseAdmin", ioe); - } - } - } - } - - public static void main(String[] args) { - if (args.length != 2) { - System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>"); - System.err.println("\twhere <num clients> >= 2"); - System.exit(1); - } - - BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1])); - try { - bb.init(); - bb.run(); - bb.verify(); - } catch (Exception e) { - LOG.error("Failed during BalanceBooks run", e); - } finally { - bb.close(); - } - } - - /** - * Represents a single client actor in the test. Each client runs as a separate thread. - * - * For the given number of iterations, the client will: - * <ol> - * <li>select a random other client from which to withdraw</li> - * <li>select a random amount from 0 to MAX_AMOUNT</li> - * <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit - * the same amount to its own account.</li> - * </ol> - * - * Since multiple clients operate concurrently and contend over a set of constrained resources - * (the client accounts), it is expected that a portion of the attempted transactions will encounter - * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully - * committed first. In this case, the updates from the transaction encountering the conflict should be completely - * rolled back, leaving the data in a consistent state. - */ - private static class Client extends Thread implements Closeable { - private final int id; - private final int totalClients; - private final int iterations; - - private final Random random = new Random(); - - private TransactionContext txContext; - private TransactionAwareHTable txTable; - - - public Client(int id, int totalClients, int iterations) { - this.id = id; - this.totalClients = totalClients; - this.iterations = iterations; - } - - /** - * Sets up any resources needed by the individual client. - * - * @param txClient the transaction client to use in accessing the transaciton service - * @param table the HBase table instance to use for accessing storage - */ - public void init(TransactionSystemClient txClient, HTableInterface table) { - txTable = new TransactionAwareHTable(table); - txContext = new TransactionContext(txClient, txTable); - } - - public void run() { - try { - for (int i = 0; i < iterations; i++) { - runOnce(); - } - } catch (TransactionFailureException e) { - LOG.error("Client #{}: Failed on exception", id, e); - } - } - - /** - * Runs a single iteration of the client logic. - */ - private void runOnce() throws TransactionFailureException { - int withdrawee = getNextWithdrawee(); - int amount = getAmount(); - - try { - txContext.start(); - long withdraweeBalance = getCurrentBalance(withdrawee); - long ownBalance = getCurrentBalance(id); - long withdraweeNew = withdraweeBalance - amount; - long ownNew = ownBalance + amount; - - setBalance(withdrawee, withdraweeNew); - setBalance(id, ownNew); - LOG.info("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}", - id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew); - txContext.finish(); - - } catch (IOException ioe) { - LOG.error("Client #{}: Unhandled client failure", id, ioe); - txContext.abort(); - } catch (TransactionConflictException tce) { - LOG.info("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee); - txContext.abort(tce); - } catch (TransactionFailureException tfe) { - LOG.error("Client #{}: Unhandled transaction failure", id, tfe); - txContext.abort(tfe); - } - } - - private long getCurrentBalance(int id) throws IOException { - Result r = txTable.get(new Get(Bytes.toBytes(id))); - byte[] balanceBytes = r.getValue(FAMILY, COL); - if (balanceBytes == null) { - return 0; - } - return Bytes.toLong(balanceBytes); - } - - private void setBalance(int id, long balance) throws IOException { - txTable.put(new Put(Bytes.toBytes(id)).add(FAMILY, COL, Bytes.toBytes(balance))); - } - - private int getNextWithdrawee() { - int next; - do { - next = random.nextInt(totalClients); - } while (next == id); - return next; - } - - private int getAmount() { - return random.nextInt(MAX_AMOUNT); - } - - public void close() throws IOException { - txTable.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java b/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java deleted file mode 100644 index 3848e6a..0000000 --- a/tephra-examples/src/main/java/co/cask/tephra/examples/package-info.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains example applications for Tephra designed to illustrate sample Tephra usage - * and provide out-of-the-box sample applications which can be run to test cluster functionality. - * - * <p>Currently the following applications are provided: - * - * <ul> - * <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate - * threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own - * accounts. At the end of the test, the total value of all account balances is verified to be equal to zero, - * which confirms that transactional integrity was not violated. - * </li> - * </ul> - * </p> - * - * <p> - * Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase - * version (currently 1.0-cdh). In the future, we should provide Maven profiles to allow compiling the examples - * against each of the supported HBase versions. - * </p> - */ -package co.cask.tephra.examples; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java new file mode 100644 index 0000000..369b11e --- /dev/null +++ b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.examples; + +import com.google.common.io.Closeables; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.tephra.hbase10cdh.TransactionAwareHTable; +import org.apache.tephra.hbase10cdh.coprocessor.TransactionProcessor; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.tephra.util.ConfigurationFactory; +import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Simple example application that launches a number of concurrent clients, one per "account". Each client attempts to + * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction. + * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to + * transaction conflicts. All clients will run for a specified number of iterations. When the processing is complete, + * the total sum of all rows should be zero, if transactional integrity was maintained. + * + * <p> + * You can run the BalanceBooks application with the following command: + * <pre> + * ./bin/tephra run BalanceBooks [num clients] [num iterations] + * </pre> + * where <code>[num clients]</code> is the number of concurrent client threads to use, and + * <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread. + * </p> + */ +public class BalanceBooks implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class); + + private static final int MAX_AMOUNT = 100; + private static final byte[] TABLE = Bytes.toBytes("testbalances"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] COL = Bytes.toBytes("b"); + + private final int totalClients; + private final int iterations; + + private Configuration conf; + private ZKClientService zkClient; + private TransactionServiceClient txClient; + private HConnection conn; + + public BalanceBooks(int totalClients, int iterations) { + this(totalClients, iterations, new ConfigurationFactory().get()); + } + + public BalanceBooks(int totalClients, int iterations, Configuration conf) { + this.totalClients = totalClients; + this.iterations = iterations; + this.conf = conf; + } + + /** + * Sets up common resources required by all clients. + */ + public void init() throws IOException { + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + + zkClient = injector.getInstance(ZKClientService.class); + zkClient.startAndWait(); + txClient = injector.getInstance(TransactionServiceClient.class); + + createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY }); + conn = HConnectionManager.createConnection(conf); + } + + /** + * Runs all clients and waits for them to complete. + */ + public void run() throws IOException, InterruptedException { + List<Client> clients = new ArrayList<>(totalClients); + for (int i = 0; i < totalClients; i++) { + Client c = new Client(i, totalClients, iterations); + c.init(txClient, conn.getTable(TABLE)); + c.start(); + clients.add(c); + } + + for (Client c : clients) { + c.join(); + Closeables.closeQuietly(c); + } + } + + /** + * Validates the current state of the data stored at the end of the test. Each update by a client consists of two + * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to + * the client's own account. So, if all the updates were performed consistently (no partial updates or partial + * rollbacks), then the total sum of all balances at the end should be 0. + */ + public boolean verify() { + boolean success = false; + try { + TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TABLE)); + TransactionContext context = new TransactionContext(txClient, table); + + LOG.info("VERIFYING BALANCES"); + context.start(); + long totalBalance = 0; + ResultScanner scanner = table.getScanner(new Scan()); + try { + for (Result r : scanner) { + if (!r.isEmpty()) { + int rowId = Bytes.toInt(r.getRow()); + long balance = Bytes.toLong(r.getValue(FAMILY, COL)); + totalBalance += balance; + LOG.info("Client #{}: balance = ${}", rowId, balance); + } + } + } finally { + if (scanner != null) { + Closeables.closeQuietly(scanner); + } + } + if (totalBalance == 0) { + LOG.info("PASSED!"); + success = true; + } else { + LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance); + } + context.finish(); + } catch (Exception e) { + LOG.error("Failed verification check", e); + } + return success; + } + + /** + * Frees up the underlying resources common to all clients. + */ + public void close() { + try { + if (conn != null) { + conn.close(); + } + } catch (IOException ignored) { } + + if (zkClient != null) { + zkClient.stopAndWait(); + } + } + + protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies) + throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + try { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : columnFamilies) { + HColumnDescriptor columnDesc = new HColumnDescriptor(family); + columnDesc.setMaxVersions(Integer.MAX_VALUE); + desc.addFamily(columnDesc); + } + desc.addCoprocessor(TransactionProcessor.class.getName()); + admin.createTable(desc); + } finally { + if (admin != null) { + try { + admin.close(); + } catch (IOException ioe) { + LOG.warn("Error closing HBaseAdmin", ioe); + } + } + } + } + + public static void main(String[] args) { + if (args.length != 2) { + System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>"); + System.err.println("\twhere <num clients> >= 2"); + System.exit(1); + } + + BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1])); + try { + bb.init(); + bb.run(); + bb.verify(); + } catch (Exception e) { + LOG.error("Failed during BalanceBooks run", e); + } finally { + bb.close(); + } + } + + /** + * Represents a single client actor in the test. Each client runs as a separate thread. + * + * For the given number of iterations, the client will: + * <ol> + * <li>select a random other client from which to withdraw</li> + * <li>select a random amount from 0 to MAX_AMOUNT</li> + * <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit + * the same amount to its own account.</li> + * </ol> + * + * Since multiple clients operate concurrently and contend over a set of constrained resources + * (the client accounts), it is expected that a portion of the attempted transactions will encounter + * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully + * committed first. In this case, the updates from the transaction encountering the conflict should be completely + * rolled back, leaving the data in a consistent state. + */ + private static class Client extends Thread implements Closeable { + private final int id; + private final int totalClients; + private final int iterations; + + private final Random random = new Random(); + + private TransactionContext txContext; + private TransactionAwareHTable txTable; + + + public Client(int id, int totalClients, int iterations) { + this.id = id; + this.totalClients = totalClients; + this.iterations = iterations; + } + + /** + * Sets up any resources needed by the individual client. + * + * @param txClient the transaction client to use in accessing the transaciton service + * @param table the HBase table instance to use for accessing storage + */ + public void init(TransactionSystemClient txClient, HTableInterface table) { + txTable = new TransactionAwareHTable(table); + txContext = new TransactionContext(txClient, txTable); + } + + public void run() { + try { + for (int i = 0; i < iterations; i++) { + runOnce(); + } + } catch (TransactionFailureException e) { + LOG.error("Client #{}: Failed on exception", id, e); + } + } + + /** + * Runs a single iteration of the client logic. + */ + private void runOnce() throws TransactionFailureException { + int withdrawee = getNextWithdrawee(); + int amount = getAmount(); + + try { + txContext.start(); + long withdraweeBalance = getCurrentBalance(withdrawee); + long ownBalance = getCurrentBalance(id); + long withdraweeNew = withdraweeBalance - amount; + long ownNew = ownBalance + amount; + + setBalance(withdrawee, withdraweeNew); + setBalance(id, ownNew); + LOG.info("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}", + id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew); + txContext.finish(); + + } catch (IOException ioe) { + LOG.error("Client #{}: Unhandled client failure", id, ioe); + txContext.abort(); + } catch (TransactionConflictException tce) { + LOG.info("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee); + txContext.abort(tce); + } catch (TransactionFailureException tfe) { + LOG.error("Client #{}: Unhandled transaction failure", id, tfe); + txContext.abort(tfe); + } + } + + private long getCurrentBalance(int id) throws IOException { + Result r = txTable.get(new Get(Bytes.toBytes(id))); + byte[] balanceBytes = r.getValue(FAMILY, COL); + if (balanceBytes == null) { + return 0; + } + return Bytes.toLong(balanceBytes); + } + + private void setBalance(int id, long balance) throws IOException { + txTable.put(new Put(Bytes.toBytes(id)).add(FAMILY, COL, Bytes.toBytes(balance))); + } + + private int getNextWithdrawee() { + int next; + do { + next = random.nextInt(totalClients); + } while (next == id); + return next; + } + + private int getAmount() { + return random.nextInt(MAX_AMOUNT); + } + + public void close() throws IOException { + txTable.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java b/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java new file mode 100644 index 0000000..172ad07 --- /dev/null +++ b/tephra-examples/src/main/java/org/apache/tephra/examples/package-info.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains example applications for Tephra designed to illustrate sample Tephra usage + * and provide out-of-the-box sample applications which can be run to test cluster functionality. + * + * <p>Currently the following applications are provided: + * + * <ul> + * <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate + * threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own + * accounts. At the end of the test, the total value of all account balances is verified to be equal to zero, + * which confirms that transactional integrity was not violated. + * </li> + * </ul> + * </p> + * + * <p> + * Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase + * version (currently 1.0-cdh). In the future, we should provide Maven profiles to allow compiling the examples + * against each of the supported HBase versions. + * </p> + */ +package org.apache.tephra.examples; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java b/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java deleted file mode 100644 index a5a9036..0000000 --- a/tephra-examples/src/test/java/co/cask/tephra/examples/BalanceBooksTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.examples; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.distributed.TransactionService; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Scopes; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.twill.zookeeper.ZKClientService; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertTrue; - -/** - * Tests the {@link BalanceBooks} program. - */ -public class BalanceBooksTest { - private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class); - private static HBaseTestingUtility testUtil; - private static TransactionService txService; - private static ZKClientService zkClientService; - - @BeforeClass - public static void setup() throws Exception { - testUtil = new HBaseTestingUtility(); - Configuration conf = testUtil.getConfiguration(); - conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tx.snapshot"); - - // Tune down the connection thread pool size - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 10); - // Tunn down handler threads in regionserver - conf.setInt("hbase.regionserver.handler.count", 10); - - // Set to random port - conf.setInt("hbase.master.port", 0); - conf.setInt("hbase.master.info.port", 0); - conf.setInt("hbase.regionserver.port", 0); - conf.setInt("hbase.regionserver.info.port", 0); - - testUtil.startMiniCluster(); - - String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode - String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':')); - LOG.info("Zookeeper Quorum is running at {}", zkQuorum); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum); - - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - Modules.override(new TransactionModules().getDistributedModules()) - .with(new AbstractModule() { - @Override - protected void configure() { - bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); - } - }), - new TransactionClientModule() - ); - - zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); - - // start a tx server - txService = injector.getInstance(TransactionService.class); - try { - LOG.info("Starting transaction service"); - txService.startAndWait(); - } catch (Exception e) { - LOG.error("Failed to start service: ", e); - } - - } - - @AfterClass - public static void tearDown() throws Exception { - if (txService != null) { - txService.stopAndWait(); - } - if (zkClientService != null) { - zkClientService.stopAndWait(); - } - testUtil.shutdownMiniCluster(); - } - - @Test - public void testBalanceBooks() throws Exception { - BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration()); - try { - bb.init(); - bb.run(); - assertTrue(bb.verify()); - } finally { - bb.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java ---------------------------------------------------------------------- diff --git a/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java new file mode 100644 index 0000000..d74a133 --- /dev/null +++ b/tephra-examples/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.examples; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.util.Modules; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.zookeeper.ZKClientService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link BalanceBooks} program. + */ +public class BalanceBooksTest { + private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class); + private static HBaseTestingUtility testUtil; + private static TransactionService txService; + private static ZKClientService zkClientService; + + @BeforeClass + public static void setup() throws Exception { + testUtil = new HBaseTestingUtility(); + Configuration conf = testUtil.getConfiguration(); + conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tx.snapshot"); + + // Tune down the connection thread pool size + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + // Tunn down handler threads in regionserver + conf.setInt("hbase.regionserver.handler.count", 10); + + // Set to random port + conf.setInt("hbase.master.port", 0); + conf.setInt("hbase.master.info.port", 0); + conf.setInt("hbase.regionserver.port", 0); + conf.setInt("hbase.regionserver.info.port", 0); + + testUtil.startMiniCluster(); + + String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode + String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':')); + LOG.info("Zookeeper Quorum is running at {}", zkQuorum); + conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum); + + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + Modules.override(new TransactionModules().getDistributedModules()) + .with(new AbstractModule() { + @Override + protected void configure() { + bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + } + }), + new TransactionClientModule() + ); + + zkClientService = injector.getInstance(ZKClientService.class); + zkClientService.startAndWait(); + + // start a tx server + txService = injector.getInstance(TransactionService.class); + try { + LOG.info("Starting transaction service"); + txService.startAndWait(); + } catch (Exception e) { + LOG.error("Failed to start service: ", e); + } + + } + + @AfterClass + public static void tearDown() throws Exception { + if (txService != null) { + txService.stopAndWait(); + } + if (zkClientService != null) { + zkClientService.stopAndWait(); + } + testUtil.shutdownMiniCluster(); + } + + @Test + public void testBalanceBooks() throws Exception { + BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration()); + try { + bb.init(); + bb.run(); + assertTrue(bb.verify()); + } finally { + bb.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java deleted file mode 100644 index 834ed59..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/HBase96ConfigurationProvider.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96; - -import co.cask.tephra.util.ConfigurationProvider; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; - -/** - * HBase 0.96 version of {@link co.cask.tephra.util.ConfigurationProvider}. - */ -public class HBase96ConfigurationProvider extends ConfigurationProvider { - @Override - public Configuration get() { - return HBaseConfiguration.create(); - } - - @Override - public Configuration get(Configuration baseConf) { - return HBaseConfiguration.create(baseConf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java deleted file mode 100644 index eaa5564..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/SecondaryIndexTable.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.hbase96; - -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.distributed.TransactionServiceClient; -import com.google.common.base.Throwables; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * A Transactional SecondaryIndexTable. - */ -public class SecondaryIndexTable { - private byte[] secondaryIndex; - private TransactionAwareHTable transactionAwareHTable; - private TransactionAwareHTable secondaryIndexTable; - private TransactionContext transactionContext; - private final TableName secondaryIndexTableName; - private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily"); - private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r'); - private static final byte[] DELIMITER = new byte[] {0}; - - public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable, - byte[] secondaryIndex) { - secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx"); - HTable secondaryIndexHTable = null; - HBaseAdmin hBaseAdmin = null; - try { - hBaseAdmin = new HBaseAdmin(hTable.getConfiguration()); - if (!hBaseAdmin.tableExists(secondaryIndexTableName)) { - hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName)); - } - secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName); - } catch (Exception e) { - Throwables.propagate(e); - } finally { - try { - hBaseAdmin.close(); - } catch (Exception e) { - Throwables.propagate(e); - } - } - - this.secondaryIndex = secondaryIndex; - this.transactionAwareHTable = new TransactionAwareHTable(hTable); - this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable); - this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable, - secondaryIndexTable); - } - - public Result get(Get get) throws IOException { - return get(Collections.singletonList(get))[0]; - } - - public Result[] get(List<Get> gets) throws IOException { - try { - transactionContext.start(); - Result[] result = transactionAwareHTable.get(gets); - transactionContext.finish(); - return result; - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - return null; - } - - public Result[] getByIndex(byte[] value) throws IOException { - try { - transactionContext.start(); - Scan scan = new Scan(value, Bytes.add(value, new byte[0])); - scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier); - ResultScanner indexScanner = secondaryIndexTable.getScanner(scan); - - ArrayList<Get> gets = new ArrayList<Get>(); - for (Result result : indexScanner) { - for (Cell cell : result.listCells()) { - gets.add(new Get(cell.getValue())); - } - } - Result[] results = transactionAwareHTable.get(gets); - transactionContext.finish(); - return results; - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - return null; - } - - public void put(Put put) throws IOException { - put(Collections.singletonList(put)); - } - - - public void put(List<Put> puts) throws IOException { - try { - transactionContext.start(); - ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>(); - for (Put put : puts) { - List<Put> indexPuts = new ArrayList<Put>(); - Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet(); - for (Map.Entry<byte [], List<KeyValue>> family : familyMap) { - for (KeyValue value : family.getValue()) { - if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - secondaryIndex, 0, secondaryIndex.length)) { - byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER, - Bytes.add(value.getValue(), DELIMITER, - value.getRow())); - Put indexPut = new Put(secondaryRow); - indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow()); - indexPuts.add(indexPut); - } - } - } - secondaryIndexPuts.addAll(indexPuts); - } - transactionAwareHTable.put(puts); - secondaryIndexTable.put(secondaryIndexPuts); - transactionContext.finish(); - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - } -}
