http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java deleted file mode 100644 index 078ea5f..0000000 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestCheckpoint.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.omid.transaction; - -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.ITestContext; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertTrue; - -@Test(groups = "sharedHBase") -public class TestCheckpoint extends OmidTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(TestCheckpoint.class); - - private HBaseTransaction enforceHBaseTransactionAsParam(Transaction tx) { - if (tx instanceof HBaseTransaction) { - return (HBaseTransaction) tx; - } else { - throw new IllegalArgumentException( - String.format("The transaction object passed %s is not an instance of HBaseTransaction", - tx.getClass().getName())); - } - } - - @Test(timeOut = 30_000) - public void testFewCheckPoints(ITestContext context) throws Exception { - - TransactionManager tm = newTransactionManager(context); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - byte[] dataValue3 = Bytes.toBytes("testWrite-3"); - - Transaction tx1 = tm.begin(); - - HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - Get g = new Get(rowName1).setMaxVersions(1); - - Result r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue2); - tt.put(tx1, row1); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue3); - tt.put(tx1, row1); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue3, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL); - - r = tt.get(tx1, g); - - assertTrue(r.size() == 3, "Expected 3 results and found " + r.size()); - - List<Cell> cells = r.getColumnCells(famName1, colName1); - assertTrue(Bytes.equals(dataValue3, cells.get(0).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - assertTrue(Bytes.equals(dataValue2, cells.get(1).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - assertTrue(Bytes.equals(dataValue1, cells.get(2).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - tt.close(); - } - - @Test(timeOut = 30_000) - public void testSNAPSHOT(ITestContext context) throws Exception { - TransactionManager tm = newTransactionManager(context); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue0 = Bytes.toBytes("testWrite-0"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - Transaction tx1 = tm.begin(); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue0); - tt.put(tx1, row1); - - tm.commit(tx1); - - tx1 = tm.begin(); - - HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1); - - Get g = new Get(rowName1).setMaxVersions(1); - - Result r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue2); - tt.put(tx1, row1); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue2, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - tt.close(); - } - - @Test(timeOut = 30_000) - public void testSNAPSHOT_ALL(ITestContext context) throws Exception { - TransactionManager tm = newTransactionManager(context); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue0 = Bytes.toBytes("testWrite-0"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - Transaction tx1 = tm.begin(); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue0); - tt.put(tx1, row1); - - tm.commit(tx1); - - tx1 = tm.begin(); - - HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1); - - Get g = new Get(rowName1).setMaxVersions(100); - - Result r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue0, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - g = new Get(rowName1).setMaxVersions(100); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue2); - tt.put(tx1, row1); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_ALL); - - r = tt.get(tx1, g); - - assertTrue(r.size() == 3, "Expected 3 results and found " + r.size()); - - List<Cell> cells = r.getColumnCells(famName1, colName1); - assertTrue(Bytes.equals(dataValue2, cells.get(0).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - assertTrue(Bytes.equals(dataValue1, cells.get(1).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - assertTrue(Bytes.equals(dataValue0, cells.get(2).getValue()), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - tt.close(); - } - - @Test(timeOut = 30_000) - public void testSNAPSHOT_EXCLUDE_CURRENT(ITestContext context) throws Exception { - TransactionManager tm = newTransactionManager(context); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - Transaction tx1 = tm.begin(); - - HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - Get g = new Get(rowName1).setMaxVersions(1); - - Result r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.checkpoint(); - - row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue2); - tt.put(tx1, row1); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - hbaseTx1.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); - - r = tt.get(tx1, g); - assertTrue(Bytes.equals(dataValue1, r.getValue(famName1, colName1)), - "Unexpected value for SI read " + tx1 + ": " + Bytes.toString(r.getValue(famName1, colName1))); - - tt.close(); - } - - @Test(timeOut = 30_000) - public void testOutOfCheckpoints(ITestContext context) throws Exception { - TransactionManager tm = newTransactionManager(context); - - Transaction tx1 = tm.begin(); - - HBaseTransaction hbaseTx1 = enforceHBaseTransactionAsParam(tx1); - - for (int i=0; i < AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1; ++i) { - hbaseTx1.checkpoint(); - } - - try { - hbaseTx1.checkpoint(); - Assert.fail(); - } catch (TransactionException e) { - // expected - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java index 5b72856..2eacd22 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestColumnIterator.java @@ -61,7 +61,7 @@ public class TestColumnIterator { public void testGroupingCellsByColumnFilteringShadowCells() { ImmutableList<Collection<Cell>> groupedColumnsWithoutShadowCells = - SnapshotFilterImpl.groupCellsByColumnFilteringShadowCellsAndFamilyDeletion(cells); + TTable.groupCellsByColumnFilteringShadowCells(cells); Log.info("Column Groups " + groupedColumnsWithoutShadowCells); assertEquals(groupedColumnsWithoutShadowCells.size(), 3, "Should be 3 column groups"); int group1Counter = 0; http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java index 735af04..c349657 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionClient.java @@ -56,7 +56,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { @Test(timeOut = 30_000) public void testIsCommitted(ITestContext context) throws Exception { TransactionManager tm = newTransactionManager(context); - TTable table = spy(new TTable(hbaseConf, TEST_TABLE, ((AbstractTransactionManager)tm).getCommitTableClient())); + TTable table = new TTable(hbaseConf, TEST_TABLE); HBaseTransaction t1 = (HBaseTransaction) tm.begin(); @@ -83,9 +83,9 @@ public class TestHBaseTransactionClient extends OmidTestBase { HBaseCellId hBaseCellId3 = new HBaseCellId(htable, row2, family, qualifier, t3.getStartTimestamp()); HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context); - assertTrue(table.isCommitted(hBaseCellId1, 0), "row1 should be committed"); - assertFalse(table.isCommitted(hBaseCellId2, 0), "row2 should not be committed for kv2"); - assertTrue(table.isCommitted(hBaseCellId3, 0), "row2 should be committed for kv3"); + assertTrue(hbaseTm.isCommitted(hBaseCellId1), "row1 should be committed"); + assertFalse(hbaseTm.isCommitted(hBaseCellId2), "row2 should not be committed for kv2"); + assertTrue(hbaseTm.isCommitted(hBaseCellId3), "row2 should be committed for kv3"); } @Test(timeOut = 30_000) @@ -96,7 +96,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { // The following line emulates a crash after commit that is observed in (*) below doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class)); - TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient())); + TTable table = new TTable(hbaseConf, TEST_TABLE); HBaseTransaction t1 = (HBaseTransaction) tm.begin(); @@ -119,7 +119,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { HBaseCellId hBaseCellId = new HBaseCellId(htable, row1, family, qualifier, t1.getStartTimestamp()); HBaseTransactionClient hbaseTm = (HBaseTransactionClient) newTransactionManager(context); - assertTrue(table.isCommitted(hBaseCellId, 0), "row1 should be committed"); + assertTrue(hbaseTm.isCommitted(hBaseCellId), "row1 should be committed"); } @Test(timeOut = 30_000) @@ -137,7 +137,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { Optional<CommitTimestamp> optionalCT = tm.commitTableClient.getCommitTimestamp(NON_EXISTING_CELL_TS).get(); assertFalse(optionalCT.isPresent()); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Test that we get an invalidation mark for an invalidated transaction // Start a transaction and invalidate it before commiting it @@ -182,14 +182,14 @@ public class TestHBaseTransactionClient extends OmidTestBase { HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Test first we can not found a non-existent cell ts HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier, NON_EXISTING_CELL_TS); // Set an empty cache to allow to bypass the checking CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - Optional<CommitTimestamp> optionalCT = table + Optional<CommitTimestamp> optionalCT = tm .readCommitTimestampFromShadowCell(NON_EXISTING_CELL_TS, ctLocator); assertFalse(optionalCT.isPresent()); @@ -200,7 +200,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { table.put(tx1, put); tm.commit(tx1); // Upon commit, the commit data should be in the shadow cells, so test it - optionalCT = table.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator); + optionalCT = tm.readCommitTimestampFromShadowCell(tx1.getStartTimestamp(), ctLocator); assertTrue(optionalCT.isPresent()); CommitTimestamp ct = optionalCT.get(); assertTrue(ct.isValid()); @@ -228,7 +228,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { // Then test that locator finds it in the cache CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, fakeCache); - CommitTimestamp ct = (new TTable(table)).locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator); + CommitTimestamp ct = tm.locateCellCommitTimestamp(CELL_ST, tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); assertEquals(ct.getValue(), CELL_CT); assertTrue(ct.getLocation().compareTo(CACHE) == 0); @@ -247,7 +247,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { // The following line emulates a crash after commit that is observed in (*) below doThrow(new RuntimeException()).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class)); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Commit a transaction that is broken on commit to avoid // write to the shadow cells and avoid cleaning the commit table HBaseTransaction tx1 = (HBaseTransaction) tm.begin(); @@ -265,10 +265,10 @@ public class TestHBaseTransactionClient extends OmidTestBase { tx1.getStartTimestamp()); CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), + CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); - long expectedCommitTS = tx1.getStartTimestamp() + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; + long expectedCommitTS = tx1.getStartTimestamp() + 1; assertEquals(ct.getValue(), expectedCommitTS); assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0); } @@ -281,7 +281,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { HBaseTransactionManager tm = (HBaseTransactionManager) newTransactionManager(context); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Commit a transaction to add ST/CT in commit table HBaseTransaction tx1 = (HBaseTransaction) tm.begin(); Put put = new Put(row1); @@ -295,7 +295,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { tx1.getStartTimestamp()); CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), + CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); assertEquals(ct.getValue(), tx1.getCommitTimestamp()); @@ -308,7 +308,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { @Test(timeOut = 30_000) public void testCellFromTransactionInPreviousEpochGetsInvalidComitTimestamp(ITestContext context) throws Exception { - final long CURRENT_EPOCH_FAKE = 1000L * AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN; + final long CURRENT_EPOCH_FAKE = 1000L; CommitTable.Client commitTableClient = spy(getCommitTable(context).getClient()); AbstractTransactionManager tm = spy((AbstractTransactionManager) newTransactionManager(context, commitTableClient)); @@ -317,15 +317,17 @@ public class TestHBaseTransactionClient extends OmidTestBase { SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create(); f.set(Optional.<CommitTimestamp>absent()); doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class)); + doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class), + any(CommitTimestampLocator.class)); - - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Commit a transaction to add ST/CT in commit table HBaseTransaction tx1 = (HBaseTransaction) tm.begin(); Put put = new Put(row1); put.add(family, qualifier, data1); table.put(tx1, put); + tm.commit(tx1); // Upon commit, the commit data should be in the shadow cells // Test a transaction in the previous epoch gets an InvalidCommitTimestamp class @@ -334,7 +336,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); // Fake the current epoch to simulate a newer TSO - CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE, ctLocator); + CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), CURRENT_EPOCH_FAKE, ctLocator); assertFalse(ct.isValid()); assertEquals(ct.getValue(), CommitTable.INVALID_TRANSACTION_MARKER); assertTrue(ct.getLocation().compareTo(COMMIT_TABLE) == 0); @@ -357,8 +359,10 @@ public class TestHBaseTransactionClient extends OmidTestBase { SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create(); f.set(Optional.<CommitTimestamp>absent()); doReturn(f).doCallRealMethod().when(commitTableClient).getCommitTimestamp(any(Long.class)); + doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class), + any(CommitTimestampLocator.class)); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Commit a transaction that is broken on commit to avoid // write to the shadow cells and avoid cleaning the commit table @@ -377,7 +381,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { tx1.getStartTimestamp()); CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), + CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); assertEquals(ct.getValue(), tx1.getCommitTimestamp()); @@ -397,8 +401,10 @@ public class TestHBaseTransactionClient extends OmidTestBase { SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create(); f.set(Optional.<CommitTimestamp>absent()); doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class)); + doReturn(Optional.<CommitTimestamp>absent()).doCallRealMethod() + .when(tm).readCommitTimestampFromShadowCell(any(Long.class), any(CommitTimestampLocator.class)); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { // Commit a transaction to add ST/CT in commit table HBaseTransaction tx1 = (HBaseTransaction) tm.begin(); @@ -413,7 +419,7 @@ public class TestHBaseTransactionClient extends OmidTestBase { tx1.getStartTimestamp()); CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - CommitTimestamp ct = table.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), + CommitTimestamp ct = tm.locateCellCommitTimestamp(tx1.getStartTimestamp(), tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); assertEquals(ct.getValue(), tx1.getCommitTimestamp()); @@ -434,12 +440,14 @@ public class TestHBaseTransactionClient extends OmidTestBase { SettableFuture<Optional<CommitTimestamp>> f = SettableFuture.create(); f.set(Optional.<CommitTimestamp>absent()); doReturn(f).when(commitTableClient).getCommitTimestamp(any(Long.class)); + doReturn(Optional.<CommitTimestamp>absent()).when(tm).readCommitTimestampFromShadowCell(any(Long.class), + any(CommitTimestampLocator.class)); - try (TTable table = spy(new TTable(hbaseConf, TEST_TABLE, tm.getCommitTableClient()))) { + try (TTable table = new TTable(hbaseConf, TEST_TABLE)) { HBaseCellId hBaseCellId = new HBaseCellId(table.getHTable(), row1, family, qualifier, CELL_TS); CommitTimestampLocator ctLocator = new CommitTimestampLocatorImpl(hBaseCellId, Maps.<Long, Long>newHashMap()); - CommitTimestamp ct = table.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(), ctLocator); + CommitTimestamp ct = tm.locateCellCommitTimestamp(CELL_TS, tm.tsoClient.getEpoch(), ctLocator); assertTrue(ct.isValid()); assertEquals(ct.getValue(), -1L); assertTrue(ct.getLocation().compareTo(NOT_PRESENT) == 0); http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java index 347c4ce..03187ea 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestHBaseTransactionManager.java @@ -52,7 +52,7 @@ public class TestHBaseTransactionManager extends OmidTestBase { TSOClient tsoClient = spy(getClient(context)); - long fakeEpoch = tsoClient.getNewStartTimestamp().get() + (FAKE_EPOCH_INCREMENT * AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN); + long fakeEpoch = tsoClient.getNewStartTimestamp().get() + FAKE_EPOCH_INCREMENT; // Modify the epoch before testing the begin method doReturn(fakeEpoch).when(tsoClient).getEpoch(); http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java index 8a35f9a..75e64fd 100644 --- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java +++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java @@ -20,9 +20,10 @@ package org.apache.omid.transaction; import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; - import org.apache.omid.committable.CommitTable; + import org.apache.omid.metrics.NullMetricsProvider; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; @@ -44,9 +45,7 @@ import org.testng.ITestContext; import org.testng.annotations.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -346,7 +345,7 @@ public class TestShadowCells extends OmidTestBase { return (List<KeyValue>) invocation.callRealMethod(); } }).when(table).filterCellsForSnapshot(Matchers.<List<Cell>>any(), - any(HBaseTransaction.class), anyInt(), Matchers.<Map<String, List<Cell>>>any()); + any(HBaseTransaction.class), anyInt()); TransactionManager tm = newTransactionManager(context); if (hasShadowCell(row, http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java index 14b9d72..3f2425b 100644 --- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java +++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java @@ -49,9 +49,6 @@ public final class CellUtils { private static final Logger LOG = LoggerFactory.getLogger(CellUtils.class); static final byte[] SHADOW_CELL_SUFFIX = "\u0080".getBytes(Charsets.UTF_8); // Non printable char (128 ASCII) static byte[] DELETE_TOMBSTONE = Bytes.toBytes("__OMID_TOMBSTONE__"); - public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0]; - public static final String TRANSACTION_ATTRIBUTE = "__OMID_TRANSACTION__"; - public static final String CLIENT_GET_ATTRIBUTE = "__OMID_CLIENT_GET__"; /** * Utility interface to get rid of the dependency on HBase server package http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml index e9eab4f..8af093c 100644 --- a/hbase-coprocessor/pom.xml +++ b/hbase-coprocessor/pom.xml @@ -31,6 +31,7 @@ <groupId>org.apache.omid</groupId> <artifactId>omid-hbase-client</artifactId> <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.omid</groupId> http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java deleted file mode 100644 index 752e9a7..0000000 --- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/OmidRegionScanner.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.omid.transaction.HBaseTransaction; -import org.apache.omid.transaction.SnapshotFilterImpl; - -public class OmidRegionScanner implements RegionScanner { - - private RegionScanner scanner; - private SnapshotFilterImpl snapshotFilter; - private HBaseTransaction transaction; - private int maxVersions; - private Map<String, List<Cell>> familyDeletionCache; - - public OmidRegionScanner(SnapshotFilterImpl snapshotFilter, - RegionScanner s, - HBaseTransaction transaction, - int maxVersions) { - this.snapshotFilter = snapshotFilter; - this.scanner = s; - this.transaction = transaction; - this.maxVersions = maxVersions; - this.familyDeletionCache = new HashMap<String, List<Cell>>(); - } - - @Override - public boolean next(List<Cell> results) throws IOException { - return next(results, Integer.MAX_VALUE); - } - - public boolean next(List<Cell> result, int limit) throws IOException { - return nextRaw(result, limit); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return scanner.getRegionInfo(); - } - - @Override - public boolean isFilterDone() throws IOException { - return scanner.isFilterDone(); - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new RuntimeException("Not implemented"); - } - - @Override - public long getMaxResultSize() { - return scanner.getMaxResultSize(); - } - - @Override - public long getMvccReadPoint() { - return scanner.getMvccReadPoint(); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - return nextRaw(result,Integer.MAX_VALUE); - } - - public boolean next(List<Cell> result, - ScannerContext scannerContext) throws IOException { - return next(result, scannerContext.getBatchLimit()); - } - - public boolean nextRaw(List<Cell> result, - ScannerContext scannerContext) throws IOException { - return nextRaw(result, scannerContext.getBatchLimit()); - } - - public int getBatch() { - return Integer.MAX_VALUE; - } - - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - List<Cell> filteredResult = new ArrayList<Cell>(); - while (filteredResult.isEmpty()) { - scanner.nextRaw(filteredResult); - if (filteredResult.isEmpty()) { - return false; - } - - filteredResult = snapshotFilter.filterCellsForSnapshot(filteredResult, transaction, maxVersions, familyDeletionCache); - } - - for (Cell cell : filteredResult) { - result.add(cell); - } - - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java b/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java deleted file mode 100644 index 28a4be3..0000000 --- a/hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/RegionAccessWrapper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.omid.transaction.TableAccessWrapper; - -// This class wraps the Region object when doing server side filtering. -public class RegionAccessWrapper implements TableAccessWrapper { - - private final Region region; - - public RegionAccessWrapper(Region region) { - this.region = region; - } - - @Override - public Result[] get(List<Get> get) throws IOException { - Result[] results = new Result[get.size()]; - - int i = 0; - for (Get g : get) { - results[i++] = region.get(g); - } - return results; - } - - @Override - public Result get(Get get) throws IOException { - return region.get(get); - } - - @Override - public void put(Put put) throws IOException { - region.put(put); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java deleted file mode 100644 index daf319c..0000000 --- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.omid.transaction; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.omid.committable.CommitTable; -import org.apache.omid.committable.hbase.HBaseCommitTable; -import org.apache.omid.committable.hbase.HBaseCommitTableConfig; -import org.apache.omid.proto.TSOProto; -import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel; -import org.apache.omid.HBaseShims; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.OmidRegionScanner; -import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; - -import static org.apache.omid.committable.hbase.HBaseCommitTableConfig.COMMIT_TABLE_NAME_KEY; - -/** - * Server side filtering to identify the transaction snapshot. - */ -public class OmidSnapshotFilter extends BaseRegionObserver { - - private static final Logger LOG = LoggerFactory.getLogger(OmidSnapshotFilter.class); - - private HBaseCommitTableConfig commitTableConf = null; - private Configuration conf = null; - @VisibleForTesting - private CommitTable.Client commitTableClient; - - private SnapshotFilterImpl snapshotFilter; - - final static String OMID_SNAPSHOT_FILTER_CF_FLAG = "OMID_SNAPSHOT_FILTER_ENABLED"; - - public OmidSnapshotFilter() { - LOG.info("Compactor coprocessor initialized via empty constructor"); - } - - @Override - public void start(CoprocessorEnvironment env) throws IOException { - LOG.info("Starting snapshot filter coprocessor"); - conf = env.getConfiguration(); - commitTableConf = new HBaseCommitTableConfig(); - String commitTableName = conf.get(COMMIT_TABLE_NAME_KEY); - if (commitTableName != null) { - commitTableConf.setTableName(commitTableName); - } - commitTableClient = initAndGetCommitTableClient(); - - snapshotFilter = new SnapshotFilterImpl(commitTableClient); - - LOG.info("Snapshot filter started"); - } - - @Override - public void stop(CoprocessorEnvironment e) throws IOException { - LOG.info("Stopping snapshot filter coprocessor"); - commitTableClient.close(); - LOG.info("Snapshot filter stopped"); - } - - @Override - public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) throws IOException { - - if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return; - - get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, null); - RegionAccessWrapper regionAccessWrapper = new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(c.getEnvironment())); - Result res = regionAccessWrapper.get(get); // get parameters were set at the client side - - snapshotFilter.setTableAccessWrapper(regionAccessWrapper); - - List<Cell> filteredKeyValues = Collections.emptyList(); - if (!res.isEmpty()) { - TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE)); - - long id = transaction.getTimestamp(); - long readTs = transaction.getReadTimestamp(); - long epoch = transaction.getEpoch(); - VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel()); - - HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), null); - filteredKeyValues = snapshotFilter.filterCellsForSnapshot(res.listCells(), hbaseTransaction, get.getMaxVersions(), new HashMap<String, List<Cell>>()); - } - - for (Cell cell : filteredKeyValues) { - result.add(cell); - } - - c.bypass(); - - } - - @Override - public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, - Scan scan, - RegionScanner s) throws IOException { - byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE); - - if (byteTransaction == null) { - return s; - } - - TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(byteTransaction); - - long id = transaction.getTimestamp(); - long readTs = transaction.getReadTimestamp(); - long epoch = transaction.getEpoch(); - VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel()); - - HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), null); - - RegionAccessWrapper regionAccessWrapper = new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(e.getEnvironment())); - - snapshotFilter.setTableAccessWrapper(regionAccessWrapper); - - return new OmidRegionScanner(snapshotFilter, s, hbaseTransaction, 1); - } - - private CommitTable.Client initAndGetCommitTableClient() throws IOException { - LOG.info("Trying to get the commit table client"); - CommitTable commitTable = new HBaseCommitTable(conf, commitTableConf); - CommitTable.Client commitTableClient = commitTable.getClient(); - LOG.info("Commit table client obtained {}", commitTableClient.getClass().getCanonicalName()); - return commitTableClient; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java deleted file mode 100644 index 130a061..0000000 --- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.omid.transaction; - -import com.google.inject.AbstractModule; -import com.google.inject.Provider; -import com.google.inject.Provides; -import org.apache.omid.committable.CommitTable; -import org.apache.omid.committable.hbase.HBaseCommitTable; -import org.apache.omid.metrics.MetricsRegistry; -import org.apache.omid.metrics.NullMetricsProvider; -import org.apache.omid.timestamp.storage.HBaseTimestampStorage; -import org.apache.omid.timestamp.storage.TimestampStorage; -import org.apache.omid.tso.BatchPoolModule; -import org.apache.omid.tso.DisruptorModule; -import org.apache.omid.tso.LeaseManagement; -import org.apache.omid.tso.MockPanicker; -import org.apache.omid.tso.NetworkInterfaceUtils; -import org.apache.omid.tso.Panicker; -import org.apache.omid.tso.PersistenceProcessorHandler; -import org.apache.omid.tso.TSOChannelHandler; -import org.apache.omid.tso.TSOServerConfig; -import org.apache.omid.tso.TSOStateManager; -import org.apache.omid.tso.TSOStateManagerImpl; -import org.apache.omid.tso.TimestampOracle; -import org.apache.omid.tso.TimestampOracleImpl; -import org.apache.omid.tso.VoidLeaseManager; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; - -import javax.inject.Named; -import javax.inject.Singleton; - -import java.io.File; -import java.io.IOException; -import java.net.SocketException; -import java.net.UnknownHostException; - -import static org.apache.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY; - -class TSOForSnapshotFilterTestModule extends AbstractModule { - - private final TSOServerConfig config; - - TSOForSnapshotFilterTestModule(TSOServerConfig config) { - this.config = config; - } - - @Override - protected void configure() { - - bind(TSOChannelHandler.class).in(Singleton.class); - - bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class); - - bind(Panicker.class).to(MockPanicker.class); - // HBase commit table creation - bind(CommitTable.class).to(HBaseCommitTable.class).in(Singleton.class); - // Timestamp storage creation - bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class); - bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class); - - install(new BatchPoolModule(config)); - // DisruptorConfig - install(new DisruptorModule(config)); - - } - - @Provides - @Singleton - Configuration provideHBaseConfig() throws IOException { - Configuration hbaseConf = HBaseConfiguration.create(); - hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024); - hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1); - hbaseConf.set("tso.host", "localhost"); - hbaseConf.setInt("tso.port", 1234); - hbaseConf.set("hbase.coprocessor.region.classes", "org.apache.omid.transaction.OmidSnapshotFilter"); - final String rootdir = "/tmp/hbase.test.dir/"; - File rootdirFile = new File(rootdir); - FileUtils.deleteDirectory(rootdirFile); - hbaseConf.set("hbase.rootdir", rootdir); - return hbaseConf; - } - - @Provides - TSOServerConfig provideTSOServerConfig() { - return config; - } - - @Provides - @Singleton - MetricsRegistry provideMetricsRegistry() { - return new NullMetricsProvider(); - } - - @Provides - @Singleton - LeaseManagement provideLeaseManager(TSOChannelHandler tsoChannelHandler, - TSOStateManager stateManager) throws IOException { - return new VoidLeaseManager(tsoChannelHandler, stateManager); - } - - @Provides - @Named(TSO_HOST_AND_PORT_KEY) - String provideTSOHostAndPort() throws SocketException, UnknownHostException { - return NetworkInterfaceUtils.getTSOHostAndPort(config); - } - - @Provides - PersistenceProcessorHandler[] getPersistenceProcessorHandler(Provider<PersistenceProcessorHandler> provider) { - PersistenceProcessorHandler[] persistenceProcessorHandlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()]; - for (int i = 0; i < persistenceProcessorHandlers.length; i++) { - persistenceProcessorHandlers[i] = provider.get(); - } - return persistenceProcessorHandlers; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java ---------------------------------------------------------------------- diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java deleted file mode 100644 index 29f0a4b..0000000 --- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.omid.transaction; - -import com.google.inject.Guice; -import com.google.inject.Injector; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.omid.TestUtils; -import org.apache.omid.committable.CommitTable; -import org.apache.omid.committable.hbase.HBaseCommitTableConfig; -import org.apache.omid.metrics.NullMetricsProvider; -import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig; -import org.apache.omid.tso.TSOServer; -import org.apache.omid.tso.TSOServerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.IOException; - -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -public class TestSnapshotFilter { - - private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFilter.class); - - private static final String TEST_FAMILY = "test-fam"; - - private static final int MAX_VERSIONS = 3; - - private AbstractTransactionManager tm; - - private Injector injector; - - private HBaseAdmin admin; - private Configuration hbaseConf; - private HBaseTestingUtility hbaseTestUtil; - private MiniHBaseCluster hbaseCluster; - - private TSOServer tso; - - private AggregationClient aggregationClient; - private CommitTable commitTable; - private PostCommitActions syncPostCommitter; - - @BeforeClass - public void setupTestSnapshotFilter() throws Exception { - TSOServerConfig tsoConfig = new TSOServerConfig(); - tsoConfig.setPort(5678); - tsoConfig.setConflictMapSize(1); - injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig)); - hbaseConf = injector.getInstance(Configuration.class); - hbaseConf.setBoolean("omid.server.side.filter", true); - hbaseConf.setInt("hbase.master.info.port", 16011); - HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class); - HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class); - - setupHBase(); - aggregationClient = new AggregationClient(hbaseConf); - admin = new HBaseAdmin(hbaseConf); - createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig); - setupTSO(); - - commitTable = injector.getInstance(CommitTable.class); - } - - private void setupHBase() throws Exception { - LOG.info("--------------------------------------------------------------------------------------------------"); - LOG.info("Setting up HBase"); - LOG.info("--------------------------------------------------------------------------------------------------"); - hbaseTestUtil = new HBaseTestingUtility(hbaseConf); - LOG.info("--------------------------------------------------------------------------------------------------"); - LOG.info("Creating HBase MiniCluster"); - LOG.info("--------------------------------------------------------------------------------------------------"); - hbaseCluster = hbaseTestUtil.startMiniCluster(1); - } - - private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig, - HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException { - createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes()); - - createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily()); - } - - private void createTableIfNotExists(String tableName, byte[]... families) throws IOException { - if (!admin.tableExists(tableName)) { - LOG.info("Creating {} table...", tableName); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - - for (byte[] family : families) { - HColumnDescriptor datafam = new HColumnDescriptor(family); - datafam.setMaxVersions(MAX_VERSIONS); - desc.addFamily(datafam); - } - - desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation"); - admin.createTable(desc); - } - - } - - private void setupTSO() throws IOException, InterruptedException { - tso = injector.getInstance(TSOServer.class); - tso.startAndWait(); - TestUtils.waitForSocketListening("localhost", 5678, 100); - Thread.currentThread().setName("UnitTest(s) thread"); - } - - @AfterClass - public void cleanupTestSnapshotFilter() throws Exception { - teardownTSO(); - hbaseCluster.shutdown(); - } - - private void teardownTSO() throws IOException, InterruptedException { - tso.stopAndWait(); - TestUtils.waitForSocketNotListening("localhost", 5678, 1000); - } - - @BeforeMethod - public void setupTestSnapshotFilterIndividualTest() throws Exception { - tm = spy((AbstractTransactionManager) newTransactionManager()); - } - - private TransactionManager newTransactionManager() throws Exception { - HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration(); - hbaseOmidClientConf.setConnectionString("localhost:5678"); - hbaseOmidClientConf.setHBaseConfiguration(hbaseConf); - CommitTable.Client commitTableClient = commitTable.getClient(); - syncPostCommitter = - spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient)); - return HBaseTransactionManager.builder(hbaseOmidClientConf) - .postCommitter(syncPostCommitter) - .commitTableClient(commitTableClient) - .build(); - } - - @Test(timeOut = 60_000) - public void testGetFirstResult() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - Get get = new Get(rowName1); - Result result = tt.get(tx2, get); - - assertTrue(!result.isEmpty(), "Result should not be empty!"); - - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - tm.commit(tx2); - - Transaction tx3 = tm.begin(); - - Put put3 = new Put(rowName1); - put3.add(famName1, colName1, dataValue1); - tt.put(tx3, put3); - - tm.commit(tx3); - - Transaction tx4 = tm.begin(); - - Get get2 = new Get(rowName1); - Result result2 = tt.get(tx4, get2); - - assertTrue(!result2.isEmpty(), "Result should not be empty!"); - - long tsRow2 = result2.rawCells()[0].getTimestamp(); - assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version"); - - tm.commit(tx4); - - tt.close(); - } - - @Test(timeOut = 60_000) - public void testGetSecondResult() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put put1 = new Put(rowName1); - put1.add(famName1, colName1, dataValue1); - tt.put(tx1, put1); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - Put put2 = new Put(rowName1); - put2.add(famName1, colName1, dataValue1); - tt.put(tx2, put2); - - Transaction tx3 = tm.begin(); - - Get get = new Get(rowName1); - Result result = tt.get(tx3, get); - - assertTrue(!result.isEmpty(), "Result should not be empty!"); - - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - tm.commit(tx3); - - tt.close(); - } - - @Test(timeOut = 60_000) - public void testScanFirstResult() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put row1 = new Put(rowName1); - row1.add(famName1, colName1, dataValue1); - tt.put(tx1, row1); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - ResultScanner iterableRS = tt.getScanner(tx2, new Scan().setStartRow(rowName1).setStopRow(rowName1)); - Result result = iterableRS.next(); - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS.next() != null); - - tm.commit(tx2); - - Transaction tx3 = tm.begin(); - - Put put3 = new Put(rowName1); - put3.add(famName1, colName1, dataValue1); - tt.put(tx3, put3); - - tm.commit(tx3); - - Transaction tx4 = tm.begin(); - - ResultScanner iterableRS2 = tt.getScanner(tx4, new Scan().setStartRow(rowName1).setStopRow(rowName1)); - Result result2 = iterableRS2.next(); - long tsRow2 = result2.rawCells()[0].getTimestamp(); - assertEquals(tsRow2, tx3.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS2.next() != null); - - tm.commit(tx4); - - tt.close(); - } - - @Test(timeOut = 60_000) - public void testScanSecondResult() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] famName1 = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put put1 = new Put(rowName1); - put1.add(famName1, colName1, dataValue1); - tt.put(tx1, put1); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - Put put2 = new Put(rowName1); - put2.add(famName1, colName1, dataValue1); - tt.put(tx2, put2); - - Transaction tx3 = tm.begin(); - - ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName1)); - Result result = iterableRS.next(); - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS.next() != null); - - tm.commit(tx3); - - tt.close(); - } - - @Test (timeOut = 60_000) - public void testScanFewResults() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] rowName2 = Bytes.toBytes("row2"); - byte[] rowName3 = Bytes.toBytes("row3"); - byte[] famName = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] colName2 = Bytes.toBytes("col2"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put put1 = new Put(rowName1); - put1.add(famName, colName1, dataValue1); - tt.put(tx1, put1); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - Put put2 = new Put(rowName2); - put2.add(famName, colName2, dataValue2); - tt.put(tx2, put2); - - tm.commit(tx2); - - Transaction tx3 = tm.begin(); - - ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3)); - Result result = iterableRS.next(); - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - result = iterableRS.next(); - tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS.next() != null); - - tm.commit(tx3); - - tt.close(); - } - - @Test (timeOut = 60_000) - public void testScanFewResultsDifferentTransaction() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] rowName2 = Bytes.toBytes("row2"); - byte[] rowName3 = Bytes.toBytes("row3"); - byte[] famName = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] colName2 = Bytes.toBytes("col2"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put put1 = new Put(rowName1); - put1.add(famName, colName1, dataValue1); - tt.put(tx1, put1); - Put put2 = new Put(rowName2); - put2.add(famName, colName2, dataValue2); - tt.put(tx1, put2); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - put2 = new Put(rowName2); - put2.add(famName, colName2, dataValue2); - tt.put(tx2, put2); - - tm.commit(tx2); - - Transaction tx3 = tm.begin(); - - ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3)); - Result result = iterableRS.next(); - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - result = iterableRS.next(); - tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx2.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS.next() != null); - - tm.commit(tx3); - - tt.close(); - } - - @Test (timeOut = 60_000) - public void testScanFewResultsSameTransaction() throws Throwable { - - byte[] rowName1 = Bytes.toBytes("row1"); - byte[] rowName2 = Bytes.toBytes("row2"); - byte[] rowName3 = Bytes.toBytes("row3"); - byte[] famName = Bytes.toBytes(TEST_FAMILY); - byte[] colName1 = Bytes.toBytes("col1"); - byte[] colName2 = Bytes.toBytes("col2"); - byte[] dataValue1 = Bytes.toBytes("testWrite-1"); - byte[] dataValue2 = Bytes.toBytes("testWrite-2"); - - String TEST_TABLE = "testGetFirstResult"; - createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY)); - TTable tt = new TTable(hbaseConf, TEST_TABLE); - - Transaction tx1 = tm.begin(); - - Put put1 = new Put(rowName1); - put1.add(famName, colName1, dataValue1); - tt.put(tx1, put1); - Put put2 = new Put(rowName2); - put2.add(famName, colName2, dataValue2); - tt.put(tx1, put2); - - tm.commit(tx1); - - Transaction tx2 = tm.begin(); - - put2 = new Put(rowName2); - put2.add(famName, colName2, dataValue2); - tt.put(tx2, put2); - - Transaction tx3 = tm.begin(); - - ResultScanner iterableRS = tt.getScanner(tx3, new Scan().setStartRow(rowName1).setStopRow(rowName3)); - Result result = iterableRS.next(); - long tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - result = iterableRS.next(); - tsRow = result.rawCells()[0].getTimestamp(); - assertEquals(tsRow, tx1.getTransactionId(), "Reading differnt version"); - - assertFalse(iterableRS.next() != null); - - tm.commit(tx3); - - tt.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index a419a1d..23742b6 100644 --- a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import java.io.IOException; @@ -40,12 +39,6 @@ public class Region { } - void put(Put putOperation) throws IOException { - - hRegion.put(putOperation); - - } - HRegionInfo getRegionInfo() { return hRegion.getRegionInfo(); http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/7a9d7d6f/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java index b22f024..500c1e2 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransaction.java @@ -18,7 +18,6 @@ package org.apache.omid.transaction; import com.google.common.base.Optional; - import org.apache.omid.tso.client.CellId; import java.util.ArrayList; @@ -38,48 +37,14 @@ import java.util.Set; */ public abstract class AbstractTransaction<T extends CellId> implements Transaction { - enum VisibilityLevel { - // Regular snapshot isolation. Returns the last key, either from the snapshot or from the current transaction - // Sets the readTimestamp to be the writeTimestamp - SNAPSHOT, - // Returns all the written version of a key X that written by the transaction and the key X from the provided snapshot. - SNAPSHOT_ALL, - // Returns the last key, either from the snapshot or from the current transaction that was written before the last checkpoint. - // Sets the readTimestamp to be the writeTimestamp - 1 - SNAPSHOT_EXCLUDE_CURRENT; - - public static VisibilityLevel fromInteger(int number) { - VisibilityLevel visibilityLevel = SNAPSHOT; - - switch (number) { - case 0: - visibilityLevel = VisibilityLevel.SNAPSHOT; - break; - case 1: - visibilityLevel = VisibilityLevel.SNAPSHOT_ALL; - break; - case 2: - visibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; - break; - default: - assert(false); - } - - return visibilityLevel; - } - } - private transient Map<String, Object> metadata = new HashMap<>(); private final AbstractTransactionManager transactionManager; private final long startTimestamp; - protected long readTimestamp; - protected long writeTimestamp; private final long epoch; private long commitTimestamp; private boolean isRollbackOnly; private final Set<T> writeSet; private Status status = Status.RUNNING; - private VisibilityLevel visibilityLevel; /** * Base constructor @@ -101,37 +66,10 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti long epoch, Set<T> writeSet, AbstractTransactionManager transactionManager) { - this(transactionId, transactionId, VisibilityLevel.SNAPSHOT, epoch, writeSet, transactionManager); - } - - public AbstractTransaction(long transactionId, - long readTimestamp, - VisibilityLevel visibilityLevel, - long epoch, - Set<T> writeSet, - AbstractTransactionManager transactionManager) { - this.startTimestamp = this.writeTimestamp = transactionId; - this.readTimestamp = readTimestamp; + this.startTimestamp = transactionId; this.epoch = epoch; this.writeSet = writeSet; this.transactionManager = transactionManager; - this.visibilityLevel = visibilityLevel; - } - - /** - * Creates a checkpoint and sets the visibility level to SNAPSHOT_EXCLUDE_CURRENT - * The number of checkpoints is bounded to NUM_CHECKPOINTS in order to make checkpoint a client side operation - * @return true if a checkpoint was created and false otherwise - * @throws TransactionException - */ - void checkpoint() throws TransactionException { - - setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); - this.readTimestamp = this.writeTimestamp++; - - if (this.writeTimestamp % AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN == 0) { - throw new TransactionException("Error: number of checkpoing cannot exceed " + (AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN - 1)); - } } /** @@ -196,22 +134,6 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti } /** - * Returns the read timestamp for this transaction. - * @return read timestamp - */ - public long getReadTimestamp() { - return readTimestamp; - } - - /** - * Returns the write timestamp for this transaction. - * @return write timestamp - */ - public long getWriteTimestamp() { - return writeTimestamp; - } - - /** * Returns the commit timestamp for this transaction. * @return commit timestamp */ @@ -220,14 +142,6 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti } /** - * Returns the visibility level for this transaction. - * @return visibility level - */ - public VisibilityLevel getVisibilityLevel() { - return visibilityLevel; - } - - /** * Sets the commit timestamp for this transaction. * @param commitTimestamp * the commit timestamp to set @@ -237,22 +151,6 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti } /** - * Sets the visibility level for this transaction. - * @param visibilityLevel - * the {@link VisibilityLevel} to set - */ - public void setVisibilityLevel(VisibilityLevel visibilityLevel) { - this.visibilityLevel = visibilityLevel; - - // If we are setting visibility level to either SNAPSHOT or SNAPSHOT_ALL - // then we should let readTimestamp equals to writeTimestamp - if (this.visibilityLevel == VisibilityLevel.SNAPSHOT || - this.visibilityLevel == VisibilityLevel.SNAPSHOT_ALL) { - this.readTimestamp = this.writeTimestamp; - } - } - - /** * Sets the status for this transaction. * @param status * the {@link Status} to set @@ -280,12 +178,10 @@ public abstract class AbstractTransaction<T extends CellId> implements Transacti @Override public String toString() { - return String.format("Tx-%s [%s] (ST=%d, RT=%d, WT=%d, CT=%d, Epoch=%d) WriteSet %s", + return String.format("Tx-%s [%s] (ST=%d, CT=%d, Epoch=%d) WriteSet %s", Long.toHexString(getTransactionId()), status, startTimestamp, - readTimestamp, - writeTimestamp, commitTimestamp, epoch, writeSet);